You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 17:55:46 UTC

[01/21] incubator-ignite git commit: Changed default timeout: SocketWrite, Ack and Network timeouts to 200.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-866 f56342fa2 -> d87a38c31 (forced update)


Changed default timeout:
SocketWrite, Ack and Network timeouts to 200.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/de1c80e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/de1c80e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/de1c80e4

Branch: refs/heads/ignite-866
Commit: de1c80e4335f39a8888f6db3121b6f8afa6e5fb1
Parents: a4c9653
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue May 12 15:22:19 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue May 12 15:22:19 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java    | 2 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java     | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de1c80e4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2d5c541..fd17791 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
     public static final int DFLT_ACK_SND_THRESHOLD = 16;
 
     /** Default socket write timeout. */
-    public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+    public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
 
     /** No-op runnable. */
     private static final IgniteRunnable NOOP = new IgniteRunnable() {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de1c80e4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 5d28af2..b7e3cd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -54,11 +54,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
     /** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
     public static final long DFLT_SOCK_TIMEOUT = 200;
 
-    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
-    public static final long DFLT_ACK_TIMEOUT = 5000;
+    /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
+    public static final long DFLT_ACK_TIMEOUT = 200;
 
-    /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
-    public static final long DFLT_NETWORK_TIMEOUT = 5000;
+    /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
+    public static final long DFLT_NETWORK_TIMEOUT = 200;
 
     /** Default value for thread priority (value is <tt>10</tt>). */
     public static final int DFLT_THREAD_PRI = 10;


[03/21] incubator-ignite git commit: 775 perm to sh files

Posted by sb...@apache.org.
775 perm to sh files


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8a0062f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8a0062f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8a0062f

Branch: refs/heads/ignite-866
Commit: a8a0062f0a13d008204ce015aa2526d609a3776f
Parents: b95d2ae
Author: avinogradov <av...@gridgain.com>
Authored: Tue May 12 20:07:18 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue May 12 20:07:18 2015 +0300

----------------------------------------------------------------------
 pom.xml | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a0062f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0643227..cb790bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,6 +300,8 @@
                                             <substitution expression="${project.version}" />
                                         </replaceregexp>
 
+                                        <chmod dir="${basedir}/target/release-package" perm="755" includes="**/*.sh"/>
+
                                         <zip destfile="${basedir}/target/bin/${ignite.zip.pattern}.zip" encoding="UTF-8">
                                             <zipfileset dir="${basedir}/target/release-package" prefix="${ignite.zip.pattern}" filemode="755">
                                                 <include name="**/*.sh" />


[09/21] incubator-ignite git commit: # ignite-669 - streaming design.

Posted by sb...@apache.org.
# ignite-669 - streaming design.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/877eb764
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/877eb764
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/877eb764

Branch: refs/heads/ignite-866
Commit: 877eb7649e61ca53765c42b4244a7d46a39d1179
Parents: be64e1d
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:11:54 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:21 2015 +0300

----------------------------------------------------------------------
 .../java/org/apache/ignite/stream/adapters/StreamAdapter.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/877eb764/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index f2e0da9..9d4772f 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -69,14 +69,14 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * @return Provided tuple extractor.
      */
-    public StreamTupleExtractor<T, K, V> getConverter() {
+    public StreamTupleExtractor<T, K, V> getTupleExtractor() {
         return extractor;
     }
 
     /**
      * @param extractor Extractor for key-value tuples from messages.
      */
-    public void setExtractor(StreamTupleExtractor<T, K, V> extractor) {
+    public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
         this.extractor = extractor;
     }
 


[12/21] incubator-ignite git commit: ignite-430 Words count Socket streamer examples

Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d87efce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d87efce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d87efce0

Branch: refs/heads/ignite-866
Commit: d87efce0af1ad269ea1a4182329f441b6ff32122
Parents: 53995dc
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 18:38:48 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:31 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java | 112 +++++++------------
 .../socket/ZStringsSocketStreamerExample.java   |  76 ++++++-------
 .../examples/streaming/socket/package-info.java |   1 +
 3 files changed, 75 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
index 73cb970..487572a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -18,36 +18,30 @@
 package org.apache.ignite.examples.streaming.socket;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
 import org.apache.ignite.stream.adapters.*;
 import org.apache.ignite.stream.socket.*;
 
-import javax.cache.processor.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
 
 /**
- * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
+ * <p>
  * To start the example, you should:
  * <ul>
- *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *      <li>Start streaming using {@link SocketStreamerExample}.</li>
- *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class SocketStreamerExample {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
     /** Port. */
     private static final int PORT = 5555;
 
@@ -63,27 +57,13 @@ public class SocketStreamerExample {
                 return;
 
             // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the same word.
-                stmr.receiver(new StreamTransformer<Integer, Long>() {
-                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
-                        throws EntryProcessorException {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
+            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
 
+            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
                 InetAddress addr = InetAddress.getLocalHost();
 
-                IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+                // Configure socket streamer
+                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
 
                 sockStmr.setAddr(addr);
 
@@ -93,9 +73,11 @@ public class SocketStreamerExample {
 
                 sockStmr.setStreamer(stmr);
 
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() {
-                    @Override public Map.Entry<Integer, Long> extract(Tuple tuple) {
-                        return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                        // By using AffinityUuid we ensure that identical
+                        // words are processed on the same cluster node.
+                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
                     }
                 });
 
@@ -114,45 +96,33 @@ public class SocketStreamerExample {
         try (Socket sock = new Socket(addr, port);
              OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
             while (true) {
-                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
-                    Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
-
-                    out.writeObject(tuple);
-
-                    byte[] arr = bos.toByteArray();
-
-                    oos.write(arr.length >>> 24);
-                    oos.write(arr.length >>> 16);
-                    oos.write(arr.length >>> 8);
-                    oos.write(arr.length);
-
-                    oos.write(arr);
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+                                    // Write message
+                                    out.writeObject(word);
+
+                                    byte[] arr = bos.toByteArray();
+
+                                    // Write message length
+                                    oos.write(arr.length >>> 24);
+                                    oos.write(arr.length >>> 16);
+                                    oos.write(arr.length >>> 8);
+                                    oos.write(arr.length);
+
+                                    oos.write(arr);
+                                }
+                            }
+                        }
+                    }
                 }
             }
         }
     }
-
-    /**
-     * Tuple.
-     */
-    private static class Tuple implements Serializable {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0;
-
-        /** Key. */
-        private final int key;
-
-        /** Count. */
-        private final long cnt;
-
-        /**
-         * @param key Key.
-         * @param cnt Count.
-         */
-        public Tuple(int key, long cnt) {
-            this.key = key;
-            this.cnt = cnt;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
index a535c73..fa5aa28 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -18,40 +18,34 @@
 package org.apache.ignite.examples.streaming.socket;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
 import org.apache.ignite.stream.adapters.*;
 import org.apache.ignite.stream.socket.*;
 
-import javax.cache.processor.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
 
 /**
- * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
+ * protocol.
  * <p>
  * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
  * zero-terminated strings.
  * <p>
  * To start the example, you should:
  * <ul>
- *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *      <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class ZStringsSocketStreamerExample {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
     /** Port. */
     private static final int PORT = 5555;
 
@@ -70,27 +64,13 @@ public class ZStringsSocketStreamerExample {
                 return;
 
             // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the same word.
-                stmr.receiver(new StreamTransformer<Integer, Long>() {
-                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
-                        throws EntryProcessorException {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
+            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
 
+            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
                 InetAddress addr = InetAddress.getLocalHost();
 
-                IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+                // Configure socket streamer
+                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
 
                 sockStmr.setAddr(addr);
 
@@ -114,10 +94,11 @@ public class ZStringsSocketStreamerExample {
                     }
                 });
 
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() {
-                    @Override public Map.Entry<Integer, Long> extract(String input) {
-                        String[] pair = input.split("=");
-                        return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                        // By using AffinityUuid we ensure that identical
+                        // words are processed on the same cluster node.
+                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
                     }
                 });
 
@@ -137,14 +118,23 @@ public class ZStringsSocketStreamerExample {
              OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
 
             while (true) {
-                int key = RAND.nextInt(RANGE);
-
-                String str = key + "=1";
-
-                byte[] arr = str.getBytes("ASCII");
-
-                oos.write(arr);
-                oos.write(DELIM);
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                byte[] arr = word.getBytes("ASCII");
+
+                                // Write message
+                                oos.write(arr);
+
+                                // Write message delimiter
+                                oos.write(DELIM);
+                            }
+                        }
+                    }
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index ae7bdf9..d0a480a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,6 +16,7 @@
  */
 
 /**
+ * <!-- Package description. -->
  * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
  */
 package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file


[18/21] incubator-ignite git commit: # minor

Posted by sb...@apache.org.
# minor


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7fa8abcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7fa8abcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7fa8abcc

Branch: refs/heads/ignite-866
Commit: 7fa8abcc8aee3fe038cb28e7666b49de6b7be796
Parents: 04774b5f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:52:38 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:52:38 2015 +0300

----------------------------------------------------------------------
 .../streaming/wordcount/socket/WordsSocketStreamerServer.java      | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7fa8abcc/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 6a8911c..9e68096 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -114,7 +114,7 @@ public class WordsSocketStreamerServer {
             sockStmr.start();
         }
         catch (IgniteException e) {
-            System.out.println("Streaming server didn't start due to an error: ");
+            System.err.println("Streaming server didn't start due to an error: ");
 
             e.printStackTrace();
 


[06/21] incubator-ignite git commit: # IGNITE-894 Revert adding @InjectRecursively.

Posted by sb...@apache.org.
# IGNITE-894 Revert adding @InjectRecursively.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56e67e8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56e67e8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56e67e8f

Branch: refs/heads/ignite-866
Commit: 56e67e8f7ce808c190346d9228987f25b54b4e6f
Parents: 593e3ee
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 14 16:38:45 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 14 16:40:00 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridInternalWrapper.java    | 30 ++++++++++++++++
 .../closure/GridClosureProcessor.java           | 25 ++++++++++----
 .../internal/processors/igfs/IgfsJobImpl.java   | 10 ++++--
 .../processors/resource/GridResourceIoc.java    | 36 ++++++++++----------
 .../resource/GridResourceProcessor.java         |  8 ++++-
 .../processors/resource/GridResourceUtils.java  | 15 ++++++++
 .../processors/resource/InjectRecursively.java  | 30 ----------------
 .../resources/META-INF/classnames.properties    |  1 -
 8 files changed, 96 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
new file mode 100644
index 0000000..76563e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Internal wrapper interface for custom resource injection logic.
+ */
+public interface GridInternalWrapper<T> {
+    /**
+     * Get user object where resources must be injected.
+     *
+     * @return User object.
+     */
+    public T userObject();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 8f5afbf..658557e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -1584,12 +1584,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private static class C1<T, R> implements ComputeJob, Externalizable {
+    private static class C1<T, R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
+        GridInternalWrapper<IgniteClosure> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        @InjectRecursively
         protected IgniteClosure<T, R> job;
 
         /** */
@@ -1635,6 +1635,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public IgniteClosure userObject() {
+            return job;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(C1.class, this);
         }
@@ -1676,12 +1681,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
     /**
      *
      */
-    private static class C2<R> implements ComputeJob, Externalizable {
+    private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        @InjectRecursively
         protected Callable<R> c;
 
         /**
@@ -1724,6 +1728,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public Callable userObject() {
+            return c;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(C2.class, this);
         }
@@ -1763,12 +1772,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
 
     /**
      */
-    private static class C4 implements ComputeJob, Externalizable {
+    private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        @InjectRecursively
         protected Runnable r;
 
         /**
@@ -1808,6 +1816,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
         }
 
         /** {@inheritDoc} */
+        @Override public Runnable userObject() {
+            return r;
+        }
+
+        /** {@inheritDoc} */
         @Override public String toString() {
             return S.toString(C4.class, this);
         }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
index 8f2cfd2..fa90e21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
 import org.apache.ignite.compute.*;
 import org.apache.ignite.igfs.*;
 import org.apache.ignite.igfs.mapreduce.*;
-import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.internal.*;
 import org.apache.ignite.resources.*;
 
 import java.io.*;
@@ -29,12 +29,11 @@ import java.io.*;
 /**
  * IGFS job implementation.
  */
-public class IgfsJobImpl implements ComputeJob {
+public class IgfsJobImpl implements ComputeJob, GridInternalWrapper<IgfsJob> {
     /** */
     private static final long serialVersionUID = 0L;
 
     /** IGFS job. */
-    @InjectRecursively
     private IgfsJob job;
 
     /** IGFS name. */
@@ -110,4 +109,9 @@ public class IgfsJobImpl implements ComputeJob {
     @Override public void cancel() {
         job.cancel();
     }
+
+    /** {@inheritDoc} */
+    @Override public IgfsJob userObject() {
+        return job;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index ce19664..1e85ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -272,29 +272,29 @@ class GridResourceIoc {
 
             for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
                 for (Field field : cls0.getDeclaredFields()) {
-                    InjectRecursively injectRecursively = field.getAnnotation(InjectRecursively.class);
+                    Annotation[] fieldAnns = field.getAnnotations();
 
-                    if (injectRecursively != null
-                        || (allowImplicitInjection && field.getName().startsWith("this$")
-                            || field.getName().startsWith("val$"))) {
-                        field.setAccessible(true);
+                    for (Annotation ann : fieldAnns) {
+                        T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
 
-                        recursiveFieldsList.add(field);
-                    }
-                    else {
-                        for (Annotation ann : field.getAnnotations()) {
-                            T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+                        if (t2 == null) {
+                            t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+                                new ArrayList<GridResourceField>(),
+                                new ArrayList<GridResourceMethod>());
 
-                            if (t2 == null) {
-                                t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
-                                    new ArrayList<GridResourceField>(),
-                                    new ArrayList<GridResourceMethod>());
+                            annMap.put(ann.annotationType(), t2);
+                        }
 
-                                annMap.put(ann.annotationType(), t2);
-                            }
+                        t2.get1().add(new GridResourceField(field, ann));
+                    }
 
-                            t2.get1().add(new GridResourceField(field, ann));
-                        }
+                    if (allowImplicitInjection
+                        && fieldAnns.length == 0
+                        && GridResourceUtils.mayRequireResources(field)) {
+                        field.setAccessible(true);
+
+                        // Account for anonymous inner classes.
+                        recursiveFieldsList.add(field);
                     }
                 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 5b51592..f5ba492 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -23,7 +23,6 @@ import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.managers.deployment.*;
 import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.*;
 import org.apache.ignite.lifecycle.*;
 import org.apache.ignite.resources.*;
@@ -281,6 +280,13 @@ public class GridResourceProcessor extends GridProcessorAdapter {
         Object obj = unwrapTarget(job);
 
         injectToJob(dep, taskCls, obj, ses, jobCtx);
+
+        if (obj instanceof GridInternalWrapper) {
+            Object usrObj = ((GridInternalWrapper)obj).userObject();
+
+            if (usrObj != null)
+                injectToJob(dep, taskCls, usrObj, ses, jobCtx);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
index 254f171..660d6ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
@@ -88,4 +88,19 @@ final class GridResourceUtils {
                 ", target=" + target + ", rsrc=" + rsrc + ']', e);
         }
     }
+
+    /**
+     * Checks if specified field requires recursive inspection to find resource annotations.
+     *
+     * @param f Field.
+     * @return {@code true} if requires, {@code false} if doesn't.
+     */
+    static boolean mayRequireResources(Field f) {
+        assert f != null;
+
+        // Need to inspect anonymous classes, callable and runnable instances.
+        return f.getName().startsWith("this$") || f.getName().startsWith("val$") ||
+            Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType()) ||
+            IgniteClosure.class.isAssignableFrom(f.getType());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
deleted file mode 100644
index 383ee03..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.resource;
-
-import java.lang.annotation.*;
-
-/**
- * Indicates that resource injection should be performed for field value too.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.FIELD)
-public @interface InjectRecursively {
-    // No-op.
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index f130840..b3eed46 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1140,7 +1140,6 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
 org.apache.ignite.internal.util.lang.GridAbsClosureX
 org.apache.ignite.internal.util.lang.GridCloseableIterator
 org.apache.ignite.internal.util.lang.GridClosureException
-org.apache.ignite.internal.util.lang.GridComputeJobWrapper
 org.apache.ignite.internal.util.lang.GridFunc$1
 org.apache.ignite.internal.util.lang.GridFunc$10
 org.apache.ignite.internal.util.lang.GridFunc$100


[05/21] incubator-ignite git commit: #ignite-373: Cache is not empty after removeAll.

Posted by sb...@apache.org.
#ignite-373: Cache is not empty after removeAll.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/593e3eee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/593e3eee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/593e3eee

Branch: refs/heads/ignite-866
Commit: 593e3eeeb0d4965b1c1a83d4f68a9d18e6615632
Parents: 7c91389
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:35:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:35:27 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/GridCacheAdapter.java      | 119 +++++------
 .../GridDistributedCacheAdapter.java            | 210 ++++++++++++-------
 .../cache/CacheRemoveAllSelfTest.java           |  81 +++++++
 .../near/NoneRebalanceModeSelfTest.java         |  67 ++++++
 .../testsuites/IgniteCacheTestSuite2.java       |   1 +
 .../testsuites/IgniteCacheTestSuite4.java       |   2 +
 6 files changed, 338 insertions(+), 142 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3826bfa..4106cb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
+            ctx.kernalContext().task().execute(
+                new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get();
         }
     }
 
@@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         if (!nodes.isEmpty()) {
             ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
+            return ctx.kernalContext().task().execute(
+                new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null);
         }
         else
             return new GridFinishedFuture<>();
@@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-        return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
+        return ctx.kernalContext().task().execute(
+            new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null);
     }
 
     /** {@inheritDoc} */
@@ -4827,13 +4830,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         private static final long serialVersionUID = 0L;
 
         /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearAllJob() {
-            // No-op.
-        }
-
-        /**
          * @param cacheName Cache name.
          * @param topVer Affinity topology version.
          */
@@ -4859,14 +4855,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         private static final long serialVersionUID = 0L;
 
         /** Keys to remove. */
-        private Set<? extends K> keys;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public GlobalClearKeySetJob() {
-            // No-op.
-        }
+        private final Set<? extends K> keys;
 
         /**
          * @param cacheName Cache name.
@@ -4897,14 +4886,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         private static final long serialVersionUID = 0L;
 
         /** Peek modes. */
-        private CachePeekMode[] peekModes;
-
-        /**
-         * Required by {@link Externalizable}.
-         */
-        public SizeJob() {
-            // No-op.
-        }
+        private final CachePeekMode[] peekModes;
 
         /**
          * @param cacheName Cache name.
@@ -5514,17 +5496,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         protected Ignite ignite;
 
         /** Affinity topology version. */
-        protected AffinityTopologyVersion topVer;
+        protected final AffinityTopologyVersion topVer;
 
         /** Cache name. */
-        protected String cacheName;
-
-        /**
-         * Empty constructor for serialization.
-         */
-        public TopologyVersionAwareJob() {
-            // No-op.
-        }
+        protected final String cacheName;
 
         /**
          * @param cacheName Cache name.
@@ -5583,24 +5558,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache context. */
-        private GridCacheContext ctx;
+        /** Cache name. */
+        private final String cacheName;
 
-        /** Peek modes. */
-        private CachePeekMode[] peekModes;
+        /** Affinity topology version. */
+        private final AffinityTopologyVersion topVer;
 
-        /**
-         * Empty constructor for serialization.
-         */
-        public SizeTask() {
-            // No-op.
-        }
+        /** Peek modes. */
+        private final CachePeekMode[] peekModes;
 
         /**
-         * @param ctx Cache context.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param peekModes Cache peek modes.
          */
-        public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
-            this.ctx = ctx;
+        public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
             this.peekModes = peekModes;
         }
 
@@ -5610,13 +5584,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
             for (ClusterNode node : subgrid)
-                jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node);
+                jobs.put(new SizeJob(cacheName, topVer, peekModes), node);
 
             return jobs;
         }
 
         /** {@inheritDoc} */
         @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            IgniteException e = res.getException();
+
+            if (e != null) {
+                if (e instanceof ClusterTopologyException)
+                    return ComputeJobResultPolicy.WAIT;
+
+                throw new IgniteException("Remote job threw exception.", e);
+            }
+
             return ComputeJobResultPolicy.WAIT;
         }
 
@@ -5640,25 +5623,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
         /** */
         private static final long serialVersionUID = 0L;
 
-        /** Cache context. */
-        private GridCacheContext ctx;
+        /** Cache name. */
+        private final String cacheName;
 
-        /** Keys to clear. */
-        private Set<? extends K> keys;
+        /** Affinity topology version. */
+        private final AffinityTopologyVersion topVer;
 
-        /**
-         * Empty constructor for serialization.
-         */
-        public ClearTask() {
-            // No-op.
-        }
+        /** Keys to clear. */
+        private final Set<? extends K> keys;
 
         /**
-         * @param ctx Cache context.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
          * @param keys Keys to clear.
          */
-        public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
-            this.ctx = ctx;
+        public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
             this.keys = keys;
         }
 
@@ -5668,9 +5649,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
             Map<ComputeJob, ClusterNode> jobs = new HashMap();
 
             for (ClusterNode node : subgrid) {
-                jobs.put(keys == null ?
-                        new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) :
-                        new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys),
+                jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) :
+                        new GlobalClearKeySetJob<K>(cacheName, topVer, keys),
                     node);
             }
 
@@ -5679,6 +5659,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
         /** {@inheritDoc} */
         @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            IgniteException e = res.getException();
+
+            if (e != null) {
+                if (e instanceof ClusterTopologyException)
+                    return ComputeJobResultPolicy.WAIT;
+
+                throw new IgniteException("Remote job threw exception.", e);
+            }
+
             return ComputeJobResultPolicy.WAIT;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..c5ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
 import org.apache.ignite.internal.*;
 import org.apache.ignite.internal.cluster.*;
 import org.apache.ignite.internal.processors.affinity.*;
@@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*;
 import org.apache.ignite.internal.processors.datastreamer.*;
 import org.apache.ignite.internal.processors.task.*;
 import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
 import org.apache.ignite.transactions.*;
 import org.jetbrains.annotations.*;
 
 import java.io.*;
 import java.util.*;
-import java.util.concurrent.*;
 
-import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
 
 /**
  * Distributed cache implementation.
@@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
         try {
             AffinityTopologyVersion topVer;
 
+            boolean retry;
+
+            CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+            boolean skipStore = opCtx != null && opCtx.skipStore();
+
             do {
+                retry = false;
+
                 topVer = ctx.affinity().affinityTopologyVersion();
 
                 // Send job to all data nodes.
                 Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
                 if (!nodes.isEmpty()) {
-                    CacheOperationContext opCtx = ctx.operationContextPerCall();
+                    ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-                    ctx.closures().callAsyncNoFailover(BROADCAST,
-                        new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
-                        true).get();
+                    retry = !ctx.kernalContext().task().execute(
+                        new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
                 }
             }
-            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+            while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
         }
         catch (ClusterGroupEmptyCheckedException ignore) {
             if (log.isDebugEnabled())
@@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
 
         AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
 
-        removeAllAsync(opFut, topVer);
+        CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+        boolean skipStore = opCtx != null && opCtx.skipStore();
+
+        removeAllAsync(opFut, topVer, skipStore);
 
         return opFut;
     }
@@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     /**
      * @param opFut Future.
      * @param topVer Topology version.
+     * @param skipStore Skip store flag.
      */
-    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) {
+    private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
+        final boolean skipStore) {
         Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
 
         if (!nodes.isEmpty()) {
-            CacheOperationContext opCtx = ctx.operationContextPerCall();
+            ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
 
-            IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,
-                    new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true);
+            IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
+                new RemoveAllTask(ctx.name(), topVer, skipStore), null);
 
-            rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
-                @Override public void apply(IgniteInternalFuture<?> fut) {
+            rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+                @Override public void apply(IgniteInternalFuture<Boolean> fut) {
                     try {
-                        fut.get();
+                        boolean retry = !fut.get();
 
                         AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
 
-                        if (topVer0.equals(topVer))
+                        if (topVer0.equals(topVer) && !retry)
                             opFut.onDone();
                         else
-                            removeAllAsync(opFut, topVer0);
+                            removeAllAsync(opFut, topVer0, skipStore);
                     }
                     catch (ClusterGroupEmptyCheckedException ignore) {
                         if (log.isDebugEnabled())
@@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
     }
 
     /**
-     * Internal callable which performs remove all primary key mappings
-     * operation on a cache with the given name.
+     * Remove task.
      */
     @GridInternal
-    private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+    private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** Cache name. */
-        private String cacheName;
+        private final String cacheName;
 
-        /** Topology version. */
-        private AffinityTopologyVersion topVer;
+        /** Affinity topology version. */
+        private final AffinityTopologyVersion topVer;
 
         /** Skip store flag. */
-        private boolean skipStore;
-
-        /** Injected grid instance. */
-        @IgniteInstanceResource
-        private Ignite ignite;
+        private final boolean skipStore;
 
         /**
-         * Empty constructor for serialization.
+         * @param cacheName Cache name.
+         * @param topVer Affinity topology version.
+         * @param skipStore Skip store flag.
          */
-        public GlobalRemoveAllCallable() {
-            // No-op.
+        public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+            this.cacheName = cacheName;
+            this.topVer = topVer;
+            this.skipStore = skipStore;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+            @Nullable Object arg) throws IgniteException {
+            Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+            for (ClusterNode node : subgrid)
+                jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+
+            return jobs;
+        }
+
+        /** {@inheritDoc} */
+        @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+            IgniteException e = res.getException();
+
+            if (e != null) {
+                if (e instanceof ClusterTopologyException)
+                    return ComputeJobResultPolicy.WAIT;
+
+                throw new IgniteException("Remote job threw exception.", e);
+            }
+
+            return ComputeJobResultPolicy.WAIT;
+        }
+
+        /** {@inheritDoc} */
+        @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+            for (ComputeJobResult locRes : results) {
+                if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData()))
+                    return false;
+            }
+
+            return true;
         }
+    }
+    /**
+     * Internal job which performs remove all primary key mappings
+     * operation on a cache with the given name.
+     */
+    @GridInternal
+    private static class GlobalRemoveAllJob<K,V>  extends TopologyVersionAwareJob {
+        /** */
+        private static final long serialVersionUID = 0L;
+
+        /** Skip store flag. */
+        private final boolean skipStore;
 
         /**
          * @param cacheName Cache name.
          * @param topVer Topology version.
          * @param skipStore Skip store flag.
          */
-        private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
-            this.cacheName = cacheName;
-            this.topVer = topVer;
+        private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+            super(cacheName, topVer);
+
             this.skipStore = skipStore;
         }
 
-        /**
-         * {@inheritDoc}
-         */
-        @Override public Object call() throws Exception {
-            GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+        /** {@inheritDoc} */
+        @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
+            GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName);
 
-            final GridCacheContext<K, V> ctx = cacheAdapter.context();
+            if (cache == null)
+                return true;
 
-            ctx.affinity().affinityReadyFuture(topVer).get();
+            final GridCacheContext<K, V> ctx = cache.context();
 
             ctx.gate().enter();
 
             try {
                 if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
-                    return null; // Ignore this remove request because remove request will be sent again.
+                    return false; // Ignore this remove request because remove request will be sent again.
 
                 GridDhtCacheAdapter<K, V> dht;
                 GridNearCacheAdapter<K, V> near = null;
 
-                if (cacheAdapter instanceof GridNearCacheAdapter) {
-                    near = ((GridNearCacheAdapter<K, V>)cacheAdapter);
+                if (cache instanceof GridNearCacheAdapter) {
+                    near = ((GridNearCacheAdapter<K, V>) cache);
                     dht = near.dht();
                 }
                 else
-                    dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
+                    dht = (GridDhtCacheAdapter<K, V>) cache;
 
                 try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
-                         (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
-                    ((DataStreamerImpl)dataLdr).maxRemapCount(0);
+                         (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+                    ((DataStreamerImpl) dataLdr).maxRemapCount(0);
 
                     dataLdr.skipStore(skipStore);
 
                     dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
 
-                    for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
-                        if (!locPart.isEmpty() && locPart.primary(topVer)) {
-                            for (GridDhtCacheEntry o : locPart.entries()) {
-                                if (!o.obsoleteOrDeleted())
-                                    dataLdr.removeDataInternal(o.key());
-                            }
-                        }
-                    }
+                    for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
+                        GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
 
-                    Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+                        if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
+                            return false;
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                        try {
+                            if (!locPart.isEmpty()) {
+                                for (GridDhtCacheEntry o : locPart.entries()) {
+                                    if (!o.obsoleteOrDeleted())
+                                        dataLdr.removeDataInternal(o.key());
+                                }
+                            }
 
-                    it = dht.context().swap().swapKeyIterator(true, false, topVer);
+                            GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+                                dht.context().swap().iterator(part);
 
-                    while (it.hasNext())
-                        dataLdr.removeDataInternal(it.next());
+                            if (iter != null) {
+                                for (Map.Entry<byte[], GridCacheSwapEntry> e : iter)
+                                    dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+                            }
+                        }
+                        finally {
+                            locPart.release();
+                        }
+                    }
                 }
 
                 if (near != null) {
@@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
                     }
                 }
             }
+            catch (IgniteCheckedException e) {
+                throw U.convertException(e);
+            }
             finally {
                 ctx.gate().leave();
             }
 
-            return null;
-        }
-
-        /** {@inheritDoc} */
-        @Override public void writeExternal(ObjectOutput out) throws IOException {
-            U.writeString(out, cacheName);
-            out.writeObject(topVer);
-            out.writeBoolean(skipStore);
-        }
-
-        /** {@inheritDoc} */
-        @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            cacheName = U.readString(in);
-            topVer = (AffinityTopologyVersion)in.readObject();
-            skipStore = in.readBoolean();
+            return true;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
new file mode 100644
index 0000000..f5de96f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test remove all method.
+ */
+public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+    /** {@inheritDoc} */
+    @Override protected long getTestTimeout() {
+        return 60000;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int gridCount() {
+        return 4;
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveAll() throws Exception {
+        IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+        for (int i = 0; i < 10_000; ++i)
+            cache.put(i, "val");
+
+        final AtomicInteger igniteId = new AtomicInteger(gridCount());
+
+        IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+            @Override public Object call() throws Exception {
+                for (int i = 0; i < 2; ++i)
+                    startGrid(igniteId.getAndIncrement());
+
+                return true;
+            }
+        }, 3, "start-node-thread");
+
+        cache.removeAll();
+
+        fut.get();
+
+        U.sleep(5000);
+
+        for (int i = 0; i < igniteId.get(); ++i) {
+            IgniteCache locCache = grid(i).cache(null);
+
+            assertEquals("Local size: " + locCache.localSize() + "\n" +
+                "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
+                "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
+                "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
+                "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
+                "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
+                0, locCache.localSize());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
new file mode 100644
index 0000000..d61ddcc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Test none rebalance mode.
+ */
+public class NoneRebalanceModeSelfTest extends GridCommonAbstractTest {
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions"})
+    @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+        IgniteConfiguration c = super.getConfiguration(gridName);
+
+        CacheConfiguration cc = defaultCacheConfiguration();
+
+        cc.setRebalanceMode(NONE);
+
+        c.setCacheConfiguration(cc);
+
+        return c;
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        startGrid(0);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testRemoveAll() throws Exception {
+        GridNearTransactionalCache cache = (GridNearTransactionalCache)((IgniteKernal)grid(0)).internalCache(null);
+
+        for (GridDhtLocalPartition part : cache.dht().topology().localPartitions())
+            assertEquals(MOVING, part.state());
+
+        grid(0).cache(null).removeAll();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc3a2c0..5738778 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
         suite.addTestSuite(GridCachePartitionedGetSelfTest.class);
         suite.addTest(new TestSuite(GridCachePartitionedBasicApiTest.class));
         suite.addTest(new TestSuite(GridCacheNearMultiGetSelfTest.class));
+        suite.addTest(new TestSuite(NoneRebalanceModeSelfTest.class));
         suite.addTest(new TestSuite(GridCacheNearJobExecutionSelfTest.class));
         suite.addTest(new TestSuite(GridCacheNearOneNodeSelfTest.class));
         suite.addTest(new TestSuite(GridCacheNearMultiNodeSelfTest.class));

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8eb0688..aaf7e5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -126,6 +126,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
 
         suite.addTestSuite(CacheNoValueClassOnServerNodeTest.class);
 
+        suite.addTestSuite(CacheRemoveAllSelfTest.class);
+
         suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
 
         return suite;


[14/21] incubator-ignite git commit: ignite-430 review

Posted by sb...@apache.org.
ignite-430 review


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fe78d42c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fe78d42c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fe78d42c

Branch: refs/heads/ignite-866
Commit: fe78d42cfe77626b7250875e0c3f608b27dc9367
Parents: 7ee8517
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu May 14 16:36:06 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:37 2015 +0300

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerClient.java       |  86 --------------
 .../socket/ZWordsSocketStreamerClient.java      |  81 --------------
 .../socket/ZWordsSocketStreamerServer.java      | 111 -------------------
 .../examples/streaming/socket/package-info.java |  21 ----
 .../streaming/wordcount/QueryWords.java         |   6 +
 .../streaming/wordcount/StreamWords.java        |   6 +
 .../socket/WordsSocketStreamerClient.java       |  82 ++++++++++++++
 .../socket/WordsSocketStreamerServer.java       | 111 +++++++++++++++++++
 .../wordcount/socket/package-info.java          |  21 ++++
 9 files changed, 226 insertions(+), 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
deleted file mode 100644
index c5ec079..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Sends words to socket server based on {@link SocketStreamer} using message size based protocol.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
- *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
- *     <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class WordsSocketStreamerClient {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws IOException {
-        InetAddress addr = InetAddress.getLocalHost();
-
-        try (Socket sock = new Socket(addr, PORT);
-             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
-            System.out.println("Words streaming started.");
-
-            while (true) {
-                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
-                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
-                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
-                        for (String word : line.split(" ")) {
-                            if (!word.isEmpty()) {
-                                // Stream words into Ignite through socket.
-                                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
-                                    // Write message
-                                    out.writeObject(word);
-
-                                    byte[] arr = bos.toByteArray();
-
-                                    // Write message length
-                                    oos.write(arr.length >>> 24);
-                                    oos.write(arr.length >>> 16);
-                                    oos.write(arr.length >>> 8);
-                                    oos.write(arr.length);
-
-                                    oos.write(arr);
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
deleted file mode 100644
index c17ccdc..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
- * Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
- *     <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
- *     <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZWordsSocketStreamerClient {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /** Delimiter. */
-    private static final byte[] DELIM = new byte[] {0};
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws IOException {
-        InetAddress addr = InetAddress.getLocalHost();
-
-        try (Socket sock = new Socket(addr, PORT);
-             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
-            System.out.println("Words streaming started.");
-
-            while (true) {
-                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
-                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
-                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
-                        for (String word : line.split(" ")) {
-                            if (!word.isEmpty()) {
-                                // Stream words into Ignite through socket.
-                                byte[] arr = word.getBytes("ASCII");
-
-                                // Write message
-                                oos.write(arr);
-
-                                // Write message delimiter
-                                oos.write(DELIM);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
deleted file mode 100644
index a0ef9da..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
- * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
- *     <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
- *     <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZWordsSocketStreamerServer {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /** Delimiter. */
-    private static final byte[] DELIM = new byte[] {0};
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws InterruptedException, IOException {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
-
-        if (!ExamplesUtils.hasServerNodes(ignite)) {
-            ignite.close();
-
-            return;
-        }
-
-        // The cache is configured with sliding window holding 1 second of the streaming data.
-        IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
-        IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
-
-        InetAddress addr = InetAddress.getLocalHost();
-
-        // Configure socket streamer
-        SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
-
-        sockStmr.setAddr(addr);
-
-        sockStmr.setPort(PORT);
-
-        sockStmr.setDelimiter(DELIM);
-
-        sockStmr.setIgnite(ignite);
-
-        sockStmr.setStreamer(stmr);
-
-        // Converter from zero-terminated string to Java strings.
-        sockStmr.setConverter(new SocketMessageConverter<String>() {
-            @Override public String convert(byte[] msg) {
-                try {
-                    return new String(msg, "ASCII");
-                }
-                catch (UnsupportedEncodingException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        });
-
-        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
-            @Override public Map.Entry<AffinityUuid, String> extract(String word) {
-                // By using AffinityUuid we ensure that identical
-                // words are processed on the same cluster node.
-                return new IgniteBiTuple<>(new AffinityUuid(word), word);
-            }
-        });
-
-        sockStmr.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
deleted file mode 100644
index c516ab4..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
- */
-package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index 149aa79..faf8b51 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -36,6 +36,12 @@ import java.util.*;
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class QueryWords {
+    /**
+     * Schedules words query execution.
+     *
+     * @param args Command line arguments (none required).
+     * @throws Exception If failed.
+     */
     public static void main(String[] args) throws Exception {
         // Mark this cluster member as client.
         Ignition.setClientMode(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index cc3c0cb..26be178 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -35,6 +35,12 @@ import java.io.*;
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class StreamWords {
+    /**
+     * Starts words streaming.
+     *
+     * @param args Command line arguments (none required).
+     * @throws Exception If failed.
+     */
     public static void main(String[] args) throws Exception {
         // Mark this cluster member as client.
         Ignition.setClientMode(true);

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
new file mode 100644
index 0000000..ea3beaa
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerClient {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws IOException {
+        InetAddress addr = InetAddress.getLocalHost();
+
+        try (
+            Socket sock = new Socket(addr, PORT);
+            OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
+        ) {
+            System.out.println("Words streaming started.");
+
+            while (true) {
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                byte[] arr = word.getBytes("ASCII");
+
+                                // Write message
+                                oos.write(arr);
+
+                                // Write message delimiter
+                                oos.write(DELIM);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
new file mode 100644
index 0000000..259c925
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerServer {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+        if (!ExamplesUtils.hasServerNodes(ignite)) {
+            ignite.close();
+
+            return;
+        }
+
+        // The cache is configured with sliding window holding 1 second of the streaming data.
+        IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+        IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+        InetAddress addr = InetAddress.getLocalHost();
+
+        // Configure socket streamer
+        SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+        sockStmr.setAddr(addr);
+
+        sockStmr.setPort(PORT);
+
+        sockStmr.setDelimiter(DELIM);
+
+        sockStmr.setIgnite(ignite);
+
+        sockStmr.setStreamer(stmr);
+
+        // Converter from zero-terminated string to Java strings.
+        sockStmr.setConverter(new SocketMessageConverter<String>() {
+            @Override public String convert(byte[] msg) {
+                try {
+                    return new String(msg, "ASCII");
+                }
+                catch (UnsupportedEncodingException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+            @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                // By using AffinityUuid we ensure that identical
+                // words are processed on the same cluster node.
+                return new IgniteBiTuple<>(new AffinityUuid(word), word);
+            }
+        });
+
+        sockStmr.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
new file mode 100644
index 0000000..048299f
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
+ */
+package org.apache.ignite.examples.streaming.wordcount.socket;


[19/21] incubator-ignite git commit: # removed cleaning of IpFinder by coordinator on topology changes.

Posted by sb...@apache.org.
# removed cleaning of IpFinder by coordinator on topology changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ee408190
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ee408190
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ee408190

Branch: refs/heads/ignite-866
Commit: ee40819088fe3724ad7e7cad66d501d7db237102
Parents: 7fa8abc
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:53:28 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:53:28 2015 +0300

----------------------------------------------------------------------
 .../spi/discovery/tcp/TcpDiscoverySpi.java      | 26 --------------------
 1 file changed, 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee408190/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1dea37a..ed0e9dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -3952,18 +3952,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
-                    if (!msg.client() && ipFinder.isShared()) {
-                        try {
-                            ipFinder.unregisterAddresses(leftNode.socketAddresses());
-                        }
-                        catch (IgniteSpiException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to unregister left node address: " + leftNode);
-
-                            onException("Failed to unregister left node address: " + leftNode, e);
-                        }
-                    }
-
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);
@@ -4131,20 +4119,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
                 long topVer;
 
                 if (locNodeCoord) {
-                    if (!node.isClient() && ipFinder.isShared()) {
-                        try {
-                            ipFinder.unregisterAddresses(node.socketAddresses());
-                        }
-                        catch (IgniteSpiException e) {
-                            if (log.isDebugEnabled())
-                                log.debug("Failed to unregister failed node address [node=" + node +
-                                    ", err=" + e.getMessage() + ']');
-
-                            onException("Failed to unregister failed node address [node=" + node +
-                                ", err=" + e.getMessage() + ']', e);
-                        }
-                    }
-
                     topVer = ring.incrementTopologyVersion();
 
                     msg.topologyVersion(topVer);


[10/21] incubator-ignite git commit: # ignite-669 - streaming design.

Posted by sb...@apache.org.
# ignite-669 - streaming design.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b0fbfa05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b0fbfa05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b0fbfa05

Branch: refs/heads/ignite-866
Commit: b0fbfa05cb1cb712fe217da7166d945c7c2ec597
Parents: 877eb76
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:13:04 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:24 2015 +0300

----------------------------------------------------------------------
 .../main/java/org/apache/ignite/stream/adapters/StreamAdapter.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0fbfa05/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index 9d4772f..c729362 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -37,7 +37,7 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * Empty constructor.
      */
-    public StreamAdapter() {
+    protected StreamAdapter() {
         // No-op.
     }
 


[21/21] incubator-ignite git commit: ignite-866 NPE during clean up

Posted by sb...@apache.org.
ignite-866 NPE during clean up


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d87a38c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d87a38c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d87a38c3

Branch: refs/heads/ignite-866
Commit: d87a38c3151ecfe0ee512516764f54a115d48abb
Parents: 94e202a
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 14:40:38 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 18:48:14 2015 +0300

----------------------------------------------------------------------
 .../processors/cache/CacheMetricsImpl.java      | 62 +++++++++++++++-----
 .../processors/cache/GridCacheAdapter.java      | 12 +++-
 .../processors/cache/GridCacheContext.java      |  8 +--
 3 files changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..af19077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
 
 import org.apache.ignite.*;
 import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
 import org.apache.ignite.internal.processors.cache.store.*;
 import org.apache.ignite.internal.util.tostring.*;
 import org.apache.ignite.internal.util.typedef.internal.*;
@@ -118,7 +119,9 @@ public class CacheMetricsImpl implements CacheMetrics {
     /** {@inheritDoc} */
     @Override public long getOverflowSize() {
         try {
-            return cctx.cache().overflowSize();
+            GridCacheAdapter<?, ?> cache = cctx.cache();
+
+            return cache != null ? cache.overflowSize() : -1;
         }
         catch (IgniteCheckedException ignored) {
             return -1;
@@ -127,34 +130,47 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public long getOffHeapEntriesCount() {
-        return cctx.cache().offHeapEntriesCount();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.offHeapEntriesCount() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public long getOffHeapAllocatedSize() {
-        return cctx.cache().offHeapAllocatedSize();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.offHeapAllocatedSize() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public int getSize() {
-        return cctx.cache().size();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache != null ? cache.size() : 0;
     }
 
     /** {@inheritDoc} */
     @Override public int getKeySize() {
-        return cctx.cache().size();
+        return getSize();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isEmpty() {
-        return cctx.cache().isEmpty();
+        GridCacheAdapter<?, ?> cache = cctx.cache();
+
+        return cache == null || cache.isEmpty();
     }
 
     /** {@inheritDoc} */
     @Override public int getDhtEvictQueueCurrentSize() {
-        return cctx.isNear() ?
-                dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1
-                : cctx.evicts().evictQueueSize();
+        GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
+
+        if (ctx == null)
+            return -1;
+
+        GridCacheEvictionManager evictMgr = ctx.evicts();
+
+        return evictMgr != null ? evictMgr.evictQueueSize() : -1;
     }
 
     /** {@inheritDoc} */
@@ -548,37 +564,51 @@ public class CacheMetricsImpl implements CacheMetrics {
 
     /** {@inheritDoc} */
     @Override public String getKeyType() {
-        return cctx.config().getKeyType().getName();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null ? ccfg.getKeyType().getName() : null;
     }
 
     /** {@inheritDoc} */
     @Override public String getValueType() {
-        return cctx.config().getValueType().getName();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null ? ccfg.getValueType().getName() : null;
     }
 
     /** {@inheritDoc} */
     @Override public boolean isReadThrough() {
-        return cctx.config().isReadThrough();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isReadThrough();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isWriteThrough() {
-        return cctx.config().isWriteThrough();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isWriteThrough();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isStoreByValue() {
-        return cctx.config().isStoreByValue();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isStoreByValue();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isStatisticsEnabled() {
-        return cctx.config().isStatisticsEnabled();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isStatisticsEnabled();
     }
 
     /** {@inheritDoc} */
     @Override public boolean isManagementEnabled() {
-        return cctx.config().isManagementEnabled();
+        CacheConfiguration ccfg = cctx.config();
+
+        return ccfg != null && ccfg.isManagementEnabled();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4106cb0..5573e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3249,7 +3249,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public long overflowSize() throws IgniteCheckedException {
-        return ctx.swap().swapSize();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.swapSize() : -1;
     }
 
     /**
@@ -3802,12 +3804,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
 
     /** {@inheritDoc} */
     @Override public long offHeapEntriesCount() {
-        return ctx.swap().offHeapEntriesCount();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.offHeapEntriesCount() : -1;
     }
 
     /** {@inheritDoc} */
     @Override public long offHeapAllocatedSize() {
-        return ctx.swap().offHeapAllocatedSize();
+        GridCacheSwapManager swapMgr = ctx.swap();
+
+        return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2eeaed6..10b8235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -98,7 +98,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     private IgniteLogger log;
 
     /** Cache configuration. */
-    private CacheConfiguration cacheCfg;
+    private volatile CacheConfiguration cacheCfg;
 
     /** Unsafe memory object for direct memory allocation. */
     private GridUnsafeMemory unsafeMemory;
@@ -116,10 +116,10 @@ public class GridCacheContext<K, V> implements Externalizable {
     private CacheContinuousQueryManager contQryMgr;
 
     /** Swap manager. */
-    private GridCacheSwapManager swapMgr;
+    private volatile GridCacheSwapManager swapMgr;
 
     /** Evictions manager. */
-    private GridCacheEvictionManager evictMgr;
+    private volatile GridCacheEvictionManager evictMgr;
 
     /** Data structures manager. */
     private CacheDataStructuresManager dataStructuresMgr;
@@ -149,7 +149,7 @@ public class GridCacheContext<K, V> implements Externalizable {
     private GridCacheGateway<K, V> gate;
 
     /** Grid cache. */
-    private GridCacheAdapter<K, V> cache;
+    private volatile GridCacheAdapter<K, V> cache;
 
     /** Cached local rich node. */
     private ClusterNode locNode;


[02/21] incubator-ignite git commit: Merge branch 'ignite-timeout' into ignite-sprint-4

Posted by sb...@apache.org.
Merge branch 'ignite-timeout' into ignite-sprint-4


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/478cc7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/478cc7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/478cc7bf

Branch: refs/heads/ignite-866
Commit: 478cc7bfe8a507982ac0ad9fd7d3370d19a936fa
Parents: b95d2ae de1c80e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue May 12 17:46:50 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue May 12 17:46:50 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java    | 2 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java     | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[11/21] incubator-ignite git commit: ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.

Posted by sb...@apache.org.
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/53995dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/53995dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/53995dcb

Branch: refs/heads/ignite-866
Commit: 53995dcb3470de07df73d8dfd284da0dbb8df8dd
Parents: b0fbfa0
Author: agura <ag...@gridgain.com>
Authored: Mon Apr 13 18:28:40 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:27 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java | 158 ++++++++++
 .../socket/ZStringsSocketStreamerExample.java   | 151 +++++++++
 .../examples/streaming/socket/package-info.java |  21 ++
 .../internal/util/nio/GridBufferedParser.java   |   4 -
 .../internal/util/nio/GridDelimitedParser.java  |  91 ++++++
 .../util/nio/GridNioDelimitedBuffer.java        | 106 +++++++
 .../ignite/stream/adapters/StreamAdapter.java   |  17 +
 .../stream/socket/IgniteSocketStreamer.java     | 217 +++++++++++++
 .../stream/socket/SocketMessageConverter.java   |  31 ++
 .../ignite/stream/socket/package-info.java      |  21 ++
 .../util/nio/GridNioDelimitedBufferTest.java    | 112 +++++++
 .../socket/IgniteSocketStreamerSelfTest.java    | 315 +++++++++++++++++++
 .../ignite/stream/socket/package-info.java      |  21 ++
 .../testsuites/IgniteStreamTestSuite.java       |  39 +++
 .../testsuites/IgniteUtilSelfTestSuite.java     |   1 +
 15 files changed, 1301 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
new file mode 100644
index 0000000..73cb970
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class SocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of the streaming data.
+            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setIgnite(ignite);
+
+                sockStmr.setStreamer(stmr);
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(Tuple tuple) {
+                        return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+            while (true) {
+                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+                    Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
+
+                    out.writeObject(tuple);
+
+                    byte[] arr = bos.toByteArray();
+
+                    oos.write(arr.length >>> 24);
+                    oos.write(arr.length >>> 16);
+                    oos.write(arr.length >>> 8);
+                    oos.write(arr.length);
+
+                    oos.write(arr);
+                }
+            }
+        }
+    }
+
+    /**
+     * Tuple.
+     */
+    private static class Tuple implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0;
+
+        /** Key. */
+        private final int key;
+
+        /** Count. */
+        private final long cnt;
+
+        /**
+         * @param key Key.
+         * @param cnt Count.
+         */
+        public Tuple(int key, long cnt) {
+            this.key = key;
+            this.cnt = cnt;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
new file mode 100644
index 0000000..a535c73
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * <p>
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
+ * zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *      <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZStringsSocketStreamerExample {
+    /** Random number generator. */
+    private static final Random RAND = new Random();
+
+    /** Range within which to generate numbers. */
+    private static final int RANGE = 1000;
+
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+            if (!ExamplesUtils.hasServerNodes(ignite))
+                return;
+
+            // The cache is configured with sliding window holding 1 second of the streaming data.
+            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
+                // Allow data updates.
+                stmr.allowOverwrite(true);
+
+                // Configure data transformation to count instances of the same word.
+                stmr.receiver(new StreamTransformer<Integer, Long>() {
+                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
+                        throws EntryProcessorException {
+                        Long val = e.getValue();
+
+                        e.setValue(val == null ? 1L : val + 1);
+
+                        return null;
+                    }
+                });
+
+                InetAddress addr = InetAddress.getLocalHost();
+
+                IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+
+                sockStmr.setAddr(addr);
+
+                sockStmr.setPort(PORT);
+
+                sockStmr.setDelimiter(DELIM);
+
+                sockStmr.setIgnite(ignite);
+
+                sockStmr.setStreamer(stmr);
+
+                // Converter from zero-terminated string to Java strings.
+                sockStmr.setConverter(new SocketMessageConverter<String>() {
+                    @Override public String convert(byte[] msg) {
+                        try {
+                            return new String(msg, "ASCII");
+                        }
+                        catch (UnsupportedEncodingException e) {
+                            throw new IgniteException(e);
+                        }
+                    }
+                });
+
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() {
+                    @Override public Map.Entry<Integer, Long> extract(String input) {
+                        String[] pair = input.split("=");
+                        return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
+                    }
+                });
+
+                sockStmr.start();
+
+                sendData(addr, PORT);
+            }
+        }
+    }
+
+    /**
+     * @param addr Address.
+     * @param port Port.
+     */
+    private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
+        try (Socket sock = new Socket(addr, port);
+             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+            while (true) {
+                int key = RAND.nextInt(RANGE);
+
+                String str = key + "=1";
+
+                byte[] arr = str.getBytes("ASCII");
+
+                oos.write(arr);
+                oos.write(DELIM);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
new file mode 100644
index 0000000..ae7bdf9
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
+ */
+package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 3f81dc4..a03d2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,7 +18,6 @@
 package org.apache.ignite.internal.util.nio;
 
 import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
 
 import java.io.*;
 import java.nio.*;
@@ -33,9 +32,6 @@ import java.nio.*;
  *     | MSG_SIZE  |   MESSAGE  | MSG_SIZE  |   MESSAGE  |
  *     +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
  * </pre>
- * <p>
- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
- * isn't equal to these bytes than exception will be thrown.
  */
 public class GridBufferedParser implements GridNioParser {
     /** Buffer metadata key. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..256597c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is appended with
+ * delimiter (bytes array). So, the stream structure is as follows:
+ * <pre>
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ *     |   MESSAGE  | DELIMITER  |  MESSAGE  | DELIMITER  |
+ *     +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * </pre>
+ */
+public class GridDelimitedParser implements GridNioParser {
+    /** Buffer metadata key. */
+    private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Direct buffer. */
+    private final boolean directBuf;
+
+    /**
+     * @param delim Delimiter.
+     * @param directBuf Direct buffer.
+     */
+    public GridDelimitedParser(byte[] delim, boolean directBuf) {
+        this.delim = delim;
+        this.directBuf = directBuf;
+    }
+
+    /** {@inheritDoc} */
+    @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+        GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+        // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+        // However, we make some additional checks.
+        if (nioBuf == null) {
+            nioBuf = new GridNioDelimitedBuffer(delim);
+
+            GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+            assert old == null;
+        }
+
+        return nioBuf.read(buf);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+        byte[] msg0 = (byte[])msg;
+
+        int cap = msg0.length + delim.length;
+        ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
+
+        res.put(msg0);
+        res.put(delim);
+
+        res.flip();
+
+        return res;
+    }
+
+    /** {@inheritDoc} */
+    @Override public String toString() {
+        return this.getClass().getSimpleName();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..2b764ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Buffer with message delimiter support.
+ */
+public class GridNioDelimitedBuffer {
+    /** Delimiter. */
+    private final byte[] delim;
+
+    /** Data. */
+    private byte[] data = new byte[16384];
+
+    /** Count. */
+    private int cnt;
+
+    /** Index. */
+    private int idx;
+
+    /**
+     * @param delim Delimiter.
+     */
+    public GridNioDelimitedBuffer(byte[] delim) {
+        assert delim != null;
+        assert delim.length > 0;
+
+        this.delim = delim;
+
+        reset();
+    }
+
+    /**
+     * Resets buffer state.
+     */
+    private void reset() {
+        cnt = 0;
+        idx = 0;
+    }
+
+    /**
+     * @param buf Buffer.
+     * @return Message bytes or {@code null} if message is not fully read yet.
+     */
+    @Nullable public byte[] read(ByteBuffer buf) {
+        while(buf.hasRemaining()) {
+            if (cnt == data.length)
+                data = Arrays.copyOf(data, data.length * 2);
+
+            byte b = buf.get();
+
+            data[cnt++] = b;
+
+            if (b == delim[idx])
+                idx++;
+            else if (idx > 0) {
+                int pos = cnt - idx;
+
+                idx = 0;
+
+                for (int i = pos; i < cnt; i++) {
+                    if (data[pos] == delim[idx]) {
+                        pos++;
+
+                        idx++;
+                    }
+                    else {
+                        pos = cnt - idx;
+
+                        idx = 0;
+                    }
+                }
+            }
+
+            if (idx == delim.length) {
+                byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+                reset();
+
+                return bytes;
+            }
+        }
+
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index c729362..b99521a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -34,6 +34,9 @@ public abstract class StreamAdapter<T, K, V> {
     /** Streamer. */
     private IgniteDataStreamer<K, V> stmr;
 
+    /** Ignite. */
+    private Ignite ignite;
+
     /**
      * Empty constructor.
      */
@@ -81,6 +84,20 @@ public abstract class StreamAdapter<T, K, V> {
     }
 
     /**
+     * @return Provided {@link Ignite} instance.
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
+     * @param ignite {@link Ignite} instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
      * Converts given message to a tuple and adds it to the underlying streamer.
      *
      * @param msg Message to convert.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
new file mode 100644
index 0000000..66369ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.adapters.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Default threads. */
+    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Address. */
+    private InetAddress addr;
+
+    /** Server port. */
+    private int port;
+
+    /** Threads number. */
+    private int threads = DFLT_THREADS;
+
+    /** Direct mode. */
+    private boolean directMode;
+
+    /** Delimiter. */
+    private byte[] delim;
+
+    /** Converter. */
+    private SocketMessageConverter<T> converter;
+
+    /** Server. */
+    private GridNioServer<byte[]> srv;
+
+    /**
+     * Sets server address.
+     *
+     * @param addr Address.
+     */
+    public void setAddr(InetAddress addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * Sets port number.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets threadds amount.
+     *
+     * @param threads Threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets direct mode flag.
+     *
+     * @param directMode Direct mode.
+     */
+    public void setDirectMode(boolean directMode) {
+        this.directMode = directMode;
+    }
+
+    /**
+     * Sets message delimiter.
+     *
+     * @param delim Delimiter.
+     */
+    public void setDelimiter(byte[] delim) {
+        this.delim = delim;
+    }
+
+    /**
+     * Sets message converter.
+     *
+     * @param converter Converter.
+     */
+    public void setConverter(SocketMessageConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.ensure(threads > 0, "threads > 0");
+
+        log = getIgnite().log();
+
+        GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+            @Override public void onConnected(GridNioSession ses) {
+                assert ses.accepted();
+
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection: " + ses.remoteAddress());
+            }
+
+            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                if (e != null)
+                    log.error("Connection failed with exception", e);
+            }
+
+            @Override public void onMessage(GridNioSession ses, byte[] msg) {
+                addMessage(converter.convert(msg));
+            }
+        };
+
+        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+        GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+            new GridDelimitedParser(delim, directMode);
+
+        if (converter == null)
+            converter = new DefaultConverter<>();
+
+        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+        GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+        try {
+            srv = new GridNioServer.Builder<byte[]>()
+                .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .port(port)
+                .listener(lsnr)
+                .logger(log)
+                .selectorCount(threads)
+                .byteOrder(byteOrder)
+                .filters(filters)
+                .build();
+        }
+        catch (IgniteCheckedException | UnknownHostException e) {
+            throw new IgniteException(e);
+        }
+
+        srv.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server started on " + addr + ':' + port);
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        srv.stop();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server stopped");
+    }
+
+    /**
+     * Converts message to Java object using Jdk marshaller.
+     */
+    private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+        /** Marshaller. */
+        private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+        /** {@inheritDoc} */
+        @Override public T convert(byte[] msg) {
+            try {
+                return MARSH.unmarshal(msg, null);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
new file mode 100644
index 0000000..8161d86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+/**
+ * Socket message converter.
+ */
+public interface SocketMessageConverter<T> {
+    /**
+     * Converter message represented by array of bytes to object.
+     *
+     * @param msg Message.
+     * @return Converted object.
+     */
+    public T convert(byte[] msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
new file mode 100644
index 0000000..a0dd2e5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import junit.framework.TestCase;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Tests for {@link GridNioDelimitedBuffer}.
+ */
+public class GridNioDelimitedBufferTest extends TestCase {
+    /** */
+    private static final String ASCII = "ASCII";
+
+    /**
+     * Tests simple delimiter (excluded from alphabet)
+     */
+    public void testReadZString() throws Exception {
+        Random rnd = new Random();
+
+        int buffSize = 0;
+
+        byte[] delim = new byte[] {0};
+
+        List<String> strs = new ArrayList<>(50);
+
+        for (int i = 0; i < 50; i++) {
+            int len = rnd.nextInt(128) + 1;
+
+            buffSize += len + delim.length;
+
+            StringBuilder sb = new StringBuilder(len);
+
+            for (int j = 0; j < len; j++)
+                sb.append((char)(rnd.nextInt(26) + 'a'));
+
+
+            strs.add(sb.toString());
+        }
+
+        ByteBuffer buff = ByteBuffer.allocate(buffSize);
+
+        for (String str : strs) {
+            buff.put(str.getBytes(ASCII));
+            buff.put(delim);
+        }
+
+        buff.flip();
+
+        byte[] msg;
+
+        GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim);
+
+        List<String> res = new ArrayList<>(strs.size());
+
+        while ((msg = delimBuff.read(buff)) != null)
+            res.add(new String(msg, ASCII));
+
+        assertEquals(strs, res);
+    }
+
+    /**
+     * Tests compound delimiter (included to alphabet)
+     */
+    public void testDelim() throws Exception {
+        byte[] delim = "aabb".getBytes(ASCII);
+
+        List<String> strs = Arrays.asList("za", "zaa", "zaab", "zab", "zaabaababbbbabaab");
+
+        int buffSize = 0;
+
+        for (String str : strs)
+            buffSize += str.length() + delim.length;
+
+        ByteBuffer buff = ByteBuffer.allocate(buffSize);
+
+        for (String str : strs) {
+            buff.put(str.getBytes(ASCII));
+            buff.put(delim);
+        }
+
+        buff.flip();
+
+        byte[] msg;
+
+        GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim);
+
+        List<String> res = new ArrayList<>(strs.size());
+
+        while ((msg = delimBuff.read(buff)) != null)
+            res.add(new String(msg, ASCII));
+
+        assertEquals(strs, res);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
new file mode 100644
index 0000000..19852ce
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link IgniteSocketStreamer}.
+ */
+public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Grid count. */
+    private final static int GRID_CNT = 3;
+
+    /** Count. */
+    private static final int CNT = 500;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
+
+    /** Port. */
+    private static int port;
+
+    /** Ignite. */
+    private static Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration();
+
+        CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite = startGrids(GRID_CNT);
+        ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+        try (ServerSocket sock = new ServerSocket(0)) {
+            port = sock.getLocalPort();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        ignite.cache(null).clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSizeBasedDefaultConverter() throws Exception {
+        test(null, null, new Runnable() {
+            @Override public void run() {
+                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    for (int i = 0; i < CNT; i++) {
+                        byte[] msg = marsh.marshal(new Tuple(i));
+
+                        os.write(msg.length >>> 24);
+                        os.write(msg.length >>> 16);
+                        os.write(msg.length >>> 8);
+                        os.write(msg.length);
+
+                        os.write(msg);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSizeBasedCustomConverter() throws Exception {
+        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+            @Override public Tuple convert(byte[] msg) {
+                int i = (msg[0] & 0xFF) << 24;
+                i |= (msg[1] & 0xFF) << 16;
+                i |= (msg[2] & 0xFF) << 8;
+                i |= msg[3] & 0xFF;
+
+                return new Tuple(i);
+            }
+        };
+
+        test(converter, null, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+                    for (int i = 0; i < CNT; i++) {
+                        os.write(0);
+                        os.write(0);
+                        os.write(0);
+                        os.write(4);
+
+                        os.write(i >>> 24);
+                        os.write(i >>> 16);
+                        os.write(i >>> 8);
+                        os.write(i);
+                    }
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDelimiterBasedDefaultConverter() throws Exception {
+        test(null, DELIM, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    for (int i = 0; i < CNT; i++) {
+                        byte[] msg = marsh.marshal(new Tuple(i));
+
+                        os.write(msg);
+                        os.write(DELIM);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDelimiterBasedCustomConverter() throws Exception {
+        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+            @Override public Tuple convert(byte[] msg) {
+                int i = (msg[0] & 0xFF) << 24;
+                i |= (msg[1] & 0xFF) << 16;
+                i |= (msg[2] & 0xFF) << 8;
+                i |= msg[3] & 0xFF;
+
+                return new Tuple(i);
+            }
+        };
+
+        test(converter, DELIM, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+                    for (int i = 0; i < CNT; i++) {
+                        os.write(i >>> 24);
+                        os.write(i >>> 16);
+                        os.write(i >>> 8);
+                        os.write(i);
+
+                        os.write(DELIM);
+                    }
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param converter Converter.
+     * @param r Runnable..
+     */
+    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+    {
+        IgniteSocketStreamer<Tuple, Integer, String> sockStmr = null;
+
+        try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
+
+            stmr.allowOverwrite(true);
+            stmr.autoFlushFrequency(10);
+
+            sockStmr = new IgniteSocketStreamer<>();
+
+            IgniteCache<Integer, String> cache = ignite.cache(null);
+
+            sockStmr.setIgnite(ignite);
+
+            sockStmr.setStreamer(stmr);
+
+            sockStmr.setPort(port);
+
+            sockStmr.setDelimiter(delim);
+
+            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
+                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
+                    return new IgniteBiTuple<>(msg.key, msg.val);
+                }
+            });
+
+            if (converter != null)
+                sockStmr.setConverter(converter);
+
+            final CountDownLatch latch = new CountDownLatch(CNT);
+
+            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+                @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            };
+
+            ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+            sockStmr.start();
+
+            r.run();
+
+            latch.await();
+
+            assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+
+            for (int i = 0; i < CNT; i++)
+                assertEquals(Integer.toString(i), cache.get(i));
+        }
+        finally {
+            if (sockStmr != null)
+                sockStmr.stop();
+        }
+
+    }
+
+    /**
+     * Tuple.
+     */
+    private static class Tuple implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Key. */
+        private final int key;
+
+        /** Value. */
+        private final String val;
+
+        /**
+         * @param key Key.
+         */
+        Tuple(int key) {
+            this.key = key;
+            this.val = Integer.toString(key);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..2e28469
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains tests for socket streamer.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
new file mode 100644
index 0000000..87bbfbb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.stream.socket.*;
+
+import junit.framework.*;
+
+/**
+ * Stream test suite.
+ */
+public class IgniteStreamTestSuite extends TestSuite {
+    /**
+     * @return Stream tests suite.
+     * @throws Exception If failed.
+     */
+    public static TestSuite suite() throws Exception {
+        TestSuite suite = new TestSuite("Ignite Stream Test Suite");
+
+        suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class));
+
+        return suite;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 941b06e..32cd038 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -67,6 +67,7 @@ public class IgniteUtilSelfTestSuite extends TestSuite {
         suite.addTestSuite(GridNioSelfTest.class);
         suite.addTestSuite(GridNioFilterChainSelfTest.class);
         suite.addTestSuite(GridNioSslSelfTest.class);
+        suite.addTestSuite(GridNioDelimitedBufferTest.class);
 
         return suite;
     }


[20/21] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94e202aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94e202aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94e202aa

Branch: refs/heads/ignite-866
Commit: 94e202aa7046de67a2c0e0da4230d424e1c95705
Parents: ee40819 15aa1cf
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:53:29 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:53:29 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 136 ++++++-------------
 .../internal/interop/InteropBootstrap.java      |  34 +++++
 .../interop/InteropBootstrapFactory.java        |  39 ++++++
 .../internal/interop/InteropIgnition.java       | 103 ++++++++++++++
 .../internal/interop/InteropProcessor.java      |  25 ++++
 5 files changed, 240 insertions(+), 97 deletions(-)
----------------------------------------------------------------------



[15/21] incubator-ignite git commit: ignite-430 Words count Socket streamer examples

Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/896b426b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/896b426b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/896b426b

Branch: refs/heads/ignite-866
Commit: 896b426bff3e37aeebbb00f54e179492543f1a1d
Parents: fe78d42
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 20:29:33 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:40 2015 +0300

----------------------------------------------------------------------
 .../streaming/StreamTransformerExample.java     |  4 +-
 .../streaming/StreamVisitorExample.java         |  4 +-
 .../socket/WordsSocketStreamerServer.java       | 93 --------------------
 .../streaming/wordcount/QueryWords.java         |  4 +-
 .../streaming/wordcount/StreamWords.java        |  4 +-
 .../socket/WordsSocketStreamerClient.java       | 14 +--
 .../socket/WordsSocketStreamerServer.java       | 27 ++++--
 7 files changed, 31 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
index 5e95892..966fce2 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
@@ -30,11 +30,9 @@ import java.util.*;
  * Stream random numbers into the streaming cache.
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start streaming using {@link StreamTransformerExample}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class StreamTransformerExample {
     /** Random number generator. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
index 0fbce68..baae5af 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
@@ -31,11 +31,9 @@ import java.util.*;
  * Stream random numbers into the streaming cache.
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start streaming using {@link StreamVisitorExample}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class StreamVisitorExample {
     /** Random number generator. */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
deleted file mode 100644
index 5af746d..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Receives words through socket using {@link SocketStreamer} and message size based protocol
- * and streams them into Ignite cache.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
- *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
- *     <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class WordsSocketStreamerServer {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws InterruptedException, IOException {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
-
-        if (!ExamplesUtils.hasServerNodes(ignite)) {
-            ignite.close();
-
-            return;
-        }
-
-        // The cache is configured with sliding window holding 1 second of the streaming data.
-        IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
-        IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
-
-        InetAddress addr = InetAddress.getLocalHost();
-
-        // Configure socket streamer
-        SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
-
-        sockStmr.setAddr(addr);
-
-        sockStmr.setPort(PORT);
-
-        sockStmr.setIgnite(ignite);
-
-        sockStmr.setStreamer(stmr);
-
-        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
-            @Override public Map.Entry<AffinityUuid, String> extract(String word) {
-                // By using AffinityUuid we ensure that identical
-                // words are processed on the same cluster node.
-                return new IgniteBiTuple<>(new AffinityUuid(word), word);
-            }
-        });
-
-        sockStmr.start();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index faf8b51..58c6ef2 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -28,12 +28,10 @@ import java.util.*;
  * Periodically query popular numbers from the streaming cache.
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start streaming using {@link StreamWords}.</li>
  *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class QueryWords {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index 26be178..6024c4b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -27,12 +27,10 @@ import java.io.*;
  * Stream words into Ignite cache.
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start streaming using {@link StreamWords}.</li>
  *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class StreamWords {
     /**

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
index ea3beaa..c4d7b8c 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
@@ -25,19 +25,19 @@ import java.io.*;
 import java.net.*;
 
 /**
- * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
- * Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
+ * Example demonstrates streaming of data from external components into Ignite cache.
+ * <p>
+ * {@code WordsSocketStreamerClient} is simple socket streaming client implementation that sends words to socket server
+ * based on {@link SocketStreamer} using message delimiter based protocol. Example illustrates usage of TCP socket
+ * streamer in case of non-Java clients. In this example words are zero-terminated strings.
  * <p>
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
  *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
  *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class WordsSocketStreamerClient {
     /** Port. */
@@ -59,7 +59,7 @@ public class WordsSocketStreamerClient {
             System.out.println("Words streaming started.");
 
             while (true) {
-                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                try (InputStream in = WordsSocketStreamerClient.class.getResourceAsStream("../alice-in-wonderland.txt");
                      LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
                     for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
                         for (String word : line.split(" ")) {

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 259c925..6a8911c 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -30,19 +30,20 @@ import java.net.*;
 import java.util.*;
 
 /**
- * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * Example demonstrates streaming of data from external components into Ignite cache.
+ * <p>
+ * {@code WordsSocketStreamerServer} is simple socket streaming server implementation that
+ * receives words from socket using {@link SocketStreamer} and message delimiter based protocol
  * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
  * In this example words are zero-terminated strings.
  * <p>
  * To start the example, you should:
  * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
  *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
  *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
  *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class WordsSocketStreamerServer {
     /** Port. */
@@ -52,9 +53,12 @@ public class WordsSocketStreamerServer {
     private static final byte[] DELIM = new byte[] {0};
 
     /**
-     * @param args Args.
+     * Starts socket streaming server.
+     *
+     * @param args Command line arguments (none required).
+     * @throws Exception If failed.
      */
-    public static void main(String[] args) throws InterruptedException, IOException {
+    public static void main(String[] args) throws Exception {
         // Mark this cluster member as client.
         Ignition.setClientMode(true);
 
@@ -106,6 +110,15 @@ public class WordsSocketStreamerServer {
             }
         });
 
-        sockStmr.start();
+        try {
+            sockStmr.start();
+        }
+        catch (IgniteException e) {
+            System.out.println("Streaming server didn't start due to an error: ");
+
+            e.printStackTrace();
+
+            ignite.close();
+        }
     }
 }


[13/21] incubator-ignite git commit: ignite-430 Words count Socket streamer examples

Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7ee85179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7ee85179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7ee85179

Branch: refs/heads/ignite-866
Commit: 7ee85179103df637ba594fab68755dee71b69997
Parents: d87efce
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 20:56:22 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:34 2015 +0300

----------------------------------------------------------------------
 .../ignite/examples/streaming/package-info.java |   1 -
 .../streaming/socket/SocketStreamerExample.java | 128 --------
 .../socket/WordsSocketStreamerClient.java       |  86 +++++
 .../socket/WordsSocketStreamerServer.java       |  93 ++++++
 .../socket/ZStringsSocketStreamerExample.java   | 141 ---------
 .../socket/ZWordsSocketStreamerClient.java      |  81 +++++
 .../socket/ZWordsSocketStreamerServer.java      | 111 +++++++
 .../examples/streaming/socket/package-info.java |   3 +-
 .../streaming/wordcount/CacheConfig.java        |   2 +-
 .../streaming/wordcount/QueryWords.java         |   2 +-
 .../streaming/wordcount/StreamWords.java        |   2 +-
 .../streaming/wordcount/package-info.java       |   1 -
 .../org/apache/ignite/stream/StreamAdapter.java | 111 +++++++
 .../ignite/stream/StreamTupleExtractor.java     |  33 ++
 .../ignite/stream/adapters/StreamAdapter.java   | 111 -------
 .../stream/adapters/StreamTupleExtractor.java   |  33 --
 .../ignite/stream/adapters/package-info.java    |  21 --
 .../stream/socket/IgniteSocketStreamer.java     | 217 -------------
 .../ignite/stream/socket/SocketStreamer.java    | 218 +++++++++++++
 .../socket/IgniteSocketStreamerSelfTest.java    | 315 -------------------
 .../stream/socket/SocketStreamerSelfTest.java   | 315 +++++++++++++++++++
 .../testsuites/IgniteStreamTestSuite.java       |   2 +-
 22 files changed, 1053 insertions(+), 974 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
index 43dea13..43fbab3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
@@ -16,7 +16,6 @@
  */
 
 /**
- * <!-- Package description. -->
  * Demonstrates usage of data streamer.
  */
 package org.apache.ignite.examples.streaming;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
deleted file mode 100644
index 487572a..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start streaming using {@link SocketStreamerExample}.</li>
- *     <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class SocketStreamerExample {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws InterruptedException, IOException {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
-            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
-                InetAddress addr = InetAddress.getLocalHost();
-
-                // Configure socket streamer
-                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
-                sockStmr.setAddr(addr);
-
-                sockStmr.setPort(PORT);
-
-                sockStmr.setIgnite(ignite);
-
-                sockStmr.setStreamer(stmr);
-
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
-                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
-                        // By using AffinityUuid we ensure that identical
-                        // words are processed on the same cluster node.
-                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
-                    }
-                });
-
-                sockStmr.start();
-
-                sendData(addr, PORT);
-            }
-        }
-    }
-
-    /**
-     * @param addr Address.
-     * @param port Port.
-     */
-    private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
-        try (Socket sock = new Socket(addr, port);
-             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-            while (true) {
-                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
-                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
-                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
-                        for (String word : line.split(" ")) {
-                            if (!word.isEmpty()) {
-                                // Stream words into Ignite through socket.
-                                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
-                                    // Write message
-                                    out.writeObject(word);
-
-                                    byte[] arr = bos.toByteArray();
-
-                                    // Write message length
-                                    oos.write(arr.length >>> 24);
-                                    oos.write(arr.length >>> 16);
-                                    oos.write(arr.length >>> 8);
-                                    oos.write(arr.length);
-
-                                    oos.write(arr);
-                                }
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
new file mode 100644
index 0000000..c5ec079
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message size based protocol.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerClient {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws IOException {
+        InetAddress addr = InetAddress.getLocalHost();
+
+        try (Socket sock = new Socket(addr, PORT);
+             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+            System.out.println("Words streaming started.");
+
+            while (true) {
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+                                    // Write message
+                                    out.writeObject(word);
+
+                                    byte[] arr = bos.toByteArray();
+
+                                    // Write message length
+                                    oos.write(arr.length >>> 24);
+                                    oos.write(arr.length >>> 16);
+                                    oos.write(arr.length >>> 8);
+                                    oos.write(arr.length);
+
+                                    oos.write(arr);
+                                }
+                            }
+                        }
+                    }
+                }
+            }
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
new file mode 100644
index 0000000..5af746d
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message size based protocol
+ * and streams them into Ignite cache.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerServer {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+        if (!ExamplesUtils.hasServerNodes(ignite)) {
+            ignite.close();
+
+            return;
+        }
+
+        // The cache is configured with sliding window holding 1 second of the streaming data.
+        IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+        IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+        InetAddress addr = InetAddress.getLocalHost();
+
+        // Configure socket streamer
+        SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+        sockStmr.setAddr(addr);
+
+        sockStmr.setPort(PORT);
+
+        sockStmr.setIgnite(ignite);
+
+        sockStmr.setStreamer(stmr);
+
+        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+            @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                // By using AffinityUuid we ensure that identical
+                // words are processed on the same cluster node.
+                return new IgniteBiTuple<>(new AffinityUuid(word), word);
+            }
+        });
+
+        sockStmr.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
deleted file mode 100644
index fa5aa28..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
- * protocol.
- * <p>
- * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
- * zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *     <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- *     <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZStringsSocketStreamerExample {
-    /** Port. */
-    private static final int PORT = 5555;
-
-    /** Delimiter. */
-    private static final byte[] DELIM = new byte[] {0};
-
-    /**
-     * @param args Args.
-     */
-    public static void main(String[] args) throws InterruptedException, IOException {
-        // Mark this cluster member as client.
-        Ignition.setClientMode(true);
-
-        try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
-            if (!ExamplesUtils.hasServerNodes(ignite))
-                return;
-
-            // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
-            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
-                InetAddress addr = InetAddress.getLocalHost();
-
-                // Configure socket streamer
-                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
-                sockStmr.setAddr(addr);
-
-                sockStmr.setPort(PORT);
-
-                sockStmr.setDelimiter(DELIM);
-
-                sockStmr.setIgnite(ignite);
-
-                sockStmr.setStreamer(stmr);
-
-                // Converter from zero-terminated string to Java strings.
-                sockStmr.setConverter(new SocketMessageConverter<String>() {
-                    @Override public String convert(byte[] msg) {
-                        try {
-                            return new String(msg, "ASCII");
-                        }
-                        catch (UnsupportedEncodingException e) {
-                            throw new IgniteException(e);
-                        }
-                    }
-                });
-
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
-                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
-                        // By using AffinityUuid we ensure that identical
-                        // words are processed on the same cluster node.
-                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
-                    }
-                });
-
-                sockStmr.start();
-
-                sendData(addr, PORT);
-            }
-        }
-    }
-
-    /**
-     * @param addr Address.
-     * @param port Port.
-     */
-    private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
-        try (Socket sock = new Socket(addr, port);
-             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
-            while (true) {
-                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
-                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
-                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
-                        for (String word : line.split(" ")) {
-                            if (!word.isEmpty()) {
-                                // Stream words into Ignite through socket.
-                                byte[] arr = word.getBytes("ASCII");
-
-                                // Write message
-                                oos.write(arr);
-
-                                // Write message delimiter
-                                oos.write(DELIM);
-                            }
-                        }
-                    }
-                }
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
new file mode 100644
index 0000000..c17ccdc
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerClient {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws IOException {
+        InetAddress addr = InetAddress.getLocalHost();
+
+        try (Socket sock = new Socket(addr, PORT);
+             OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+            System.out.println("Words streaming started.");
+
+            while (true) {
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                byte[] arr = word.getBytes("ASCII");
+
+                                // Write message
+                                oos.write(arr);
+
+                                // Write message delimiter
+                                oos.write(DELIM);
+                            }
+                        }
+                    }
+                }
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
new file mode 100644
index 0000000..a0ef9da
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ *     <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerServer {
+    /** Port. */
+    private static final int PORT = 5555;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0};
+
+    /**
+     * @param args Args.
+     */
+    public static void main(String[] args) throws InterruptedException, IOException {
+        // Mark this cluster member as client.
+        Ignition.setClientMode(true);
+
+        Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+        if (!ExamplesUtils.hasServerNodes(ignite)) {
+            ignite.close();
+
+            return;
+        }
+
+        // The cache is configured with sliding window holding 1 second of the streaming data.
+        IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+        IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+        InetAddress addr = InetAddress.getLocalHost();
+
+        // Configure socket streamer
+        SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+        sockStmr.setAddr(addr);
+
+        sockStmr.setPort(PORT);
+
+        sockStmr.setDelimiter(DELIM);
+
+        sockStmr.setIgnite(ignite);
+
+        sockStmr.setStreamer(stmr);
+
+        // Converter from zero-terminated string to Java strings.
+        sockStmr.setConverter(new SocketMessageConverter<String>() {
+            @Override public String convert(byte[] msg) {
+                try {
+                    return new String(msg, "ASCII");
+                }
+                catch (UnsupportedEncodingException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+        sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+            @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                // By using AffinityUuid we ensure that identical
+                // words are processed on the same cluster node.
+                return new IgniteBiTuple<>(new AffinityUuid(word), word);
+            }
+        });
+
+        sockStmr.start();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index d0a480a..c516ab4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,7 +16,6 @@
  */
 
 /**
- * <!-- Package description. -->
- * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
+ * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
  */
 package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
index 58704ca..d17b97d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
@@ -26,7 +26,7 @@ import javax.cache.expiry.*;
 import static java.util.concurrent.TimeUnit.*;
 
 /**
- * Configuration for the streaming cache to store the stream of random numbers.
+ * Configuration for the streaming cache to store the stream of words.
  * This cache is configured with sliding window of 1 second, which means that
  * data older than 1 second will be automatically removed from the cache.
  */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index 3bd9d3d..149aa79 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -30,7 +30,7 @@ import java.util.*;
  * <ul>
  *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
  *     <li>Start streaming using {@link StreamWords}.</li>
- *     <li>Start querying popular numbers using {@link QueryWords}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index c59fa51..cc3c0cb 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -29,7 +29,7 @@ import java.io.*;
  * <ul>
  *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
  *     <li>Start streaming using {@link StreamWords}.</li>
- *     <li>Start querying popular numbers using {@link QueryWords}.</li>
+ *     <li>Start querying popular words using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
index 010f86a..5d48ae3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
@@ -16,7 +16,6 @@
  */
 
 /**
- * <!-- Package description. -->
  * Streaming word count example.
  */
 package org.apache.ignite.examples.streaming.wordcount;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
new file mode 100644
index 0000000..0c4e2d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+    /** Tuple extractor. */
+    private StreamTupleExtractor<T, K, V> extractor;
+
+    /** Streamer. */
+    private IgniteDataStreamer<K, V> stmr;
+
+    /** Ignite. */
+    private Ignite ignite;
+
+    /**
+     * Empty constructor.
+     */
+    protected StreamAdapter() {
+        // No-op.
+    }
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor.
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.extractor = extractor;
+    }
+
+    /**
+     * @return Provided data streamer.
+     */
+    public IgniteDataStreamer<K, V> getStreamer() {
+        return stmr;
+    }
+
+    /**
+     * @param stmr Ignite data streamer.
+     */
+    public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+        this.stmr = stmr;
+    }
+
+    /**
+     * @return Provided tuple extractor.
+     */
+    public StreamTupleExtractor<T, K, V> getTupleExtractor() {
+        return extractor;
+    }
+
+    /**
+     * @param extractor Extractor for key-value tuples from messages.
+     */
+    public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
+        this.extractor = extractor;
+    }
+
+    /**
+     * @return Provided {@link Ignite} instance.
+     */
+    public Ignite getIgnite() {
+        return ignite;
+    }
+
+    /**
+     * @param ignite {@link Ignite} instance.
+     */
+    public void setIgnite(Ignite ignite) {
+        this.ignite = ignite;
+    }
+
+    /**
+     * Converts given message to a tuple and adds it to the underlying streamer.
+     *
+     * @param msg Message to convert.
+     */
+    protected void addMessage(T msg) {
+        Map.Entry<K, V> e = extractor.extract(msg);
+
+        if (e != null)
+            stmr.addData(e);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
new file mode 100644
index 0000000..d2a4ede
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
deleted file mode 100644
index b99521a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import org.apache.ignite.*;
-
-import java.util.*;
-
-/**
- * Convenience adapter for streamers. Adapters are optional components for
- * streaming from different data sources. The purpose of adapters is to
- * convert different message formats into Ignite stream key-value tuples
- * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
- */
-public abstract class StreamAdapter<T, K, V> {
-    /** Tuple extractor. */
-    private StreamTupleExtractor<T, K, V> extractor;
-
-    /** Streamer. */
-    private IgniteDataStreamer<K, V> stmr;
-
-    /** Ignite. */
-    private Ignite ignite;
-
-    /**
-     * Empty constructor.
-     */
-    protected StreamAdapter() {
-        // No-op.
-    }
-
-    /**
-     * Stream adapter.
-     *
-     * @param stmr Streamer.
-     * @param extractor Tuple extractor.
-     */
-    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
-        this.stmr = stmr;
-        this.extractor = extractor;
-    }
-
-    /**
-     * @return Provided data streamer.
-     */
-    public IgniteDataStreamer<K, V> getStreamer() {
-        return stmr;
-    }
-
-    /**
-     * @param stmr Ignite data streamer.
-     */
-    public void setStreamer(IgniteDataStreamer<K, V> stmr) {
-        this.stmr = stmr;
-    }
-
-    /**
-     * @return Provided tuple extractor.
-     */
-    public StreamTupleExtractor<T, K, V> getTupleExtractor() {
-        return extractor;
-    }
-
-    /**
-     * @param extractor Extractor for key-value tuples from messages.
-     */
-    public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
-        this.extractor = extractor;
-    }
-
-    /**
-     * @return Provided {@link Ignite} instance.
-     */
-    public Ignite getIgnite() {
-        return ignite;
-    }
-
-    /**
-     * @param ignite {@link Ignite} instance.
-     */
-    public void setIgnite(Ignite ignite) {
-        this.ignite = ignite;
-    }
-
-    /**
-     * Converts given message to a tuple and adds it to the underlying streamer.
-     *
-     * @param msg Message to convert.
-     */
-    protected void addMessage(T msg) {
-        Map.Entry<K, V> e = extractor.extract(msg);
-
-        if (e != null)
-            stmr.addData(e);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
deleted file mode 100644
index 9b0c395..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import java.util.*;
-
-/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- */
-public interface StreamTupleExtractor<T, K, V> {
-    /**
-     * Extracts a key-value tuple from a message.
-     *
-     * @param msg Message.
-     * @return Key-value tuple.
-     */
-    public Map.Entry<K, V> extract(T msg);
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
deleted file mode 100644
index a69ffc0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains Ignite stream adapters.
- */
-package org.apache.ignite.stream.adapters;

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
deleted file mode 100644
index 66369ea..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.stream.adapters.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.nio.*;
-
-/**
- * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
- * streams into {@link IgniteDataStreamer} instance.
- * <p>
- * By default server uses size-based message processing. That is every message sent over the socket is prepended with
- * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
- * delimiter-based message processing will be used. That is every message sent over the socket is appended with
- * provided delimiter.
- * <p>
- * Received messages through socket converts to Java object using standard serialization. Conversion functionality
- * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
- * non Java clients).
- */
-public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
-    /** Default threads. */
-    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
-
-    /** Logger. */
-    private IgniteLogger log;
-
-    /** Address. */
-    private InetAddress addr;
-
-    /** Server port. */
-    private int port;
-
-    /** Threads number. */
-    private int threads = DFLT_THREADS;
-
-    /** Direct mode. */
-    private boolean directMode;
-
-    /** Delimiter. */
-    private byte[] delim;
-
-    /** Converter. */
-    private SocketMessageConverter<T> converter;
-
-    /** Server. */
-    private GridNioServer<byte[]> srv;
-
-    /**
-     * Sets server address.
-     *
-     * @param addr Address.
-     */
-    public void setAddr(InetAddress addr) {
-        this.addr = addr;
-    }
-
-    /**
-     * Sets port number.
-     *
-     * @param port Port.
-     */
-    public void setPort(int port) {
-        this.port = port;
-    }
-
-    /**
-     * Sets threadds amount.
-     *
-     * @param threads Threads.
-     */
-    public void setThreads(int threads) {
-        this.threads = threads;
-    }
-
-    /**
-     * Sets direct mode flag.
-     *
-     * @param directMode Direct mode.
-     */
-    public void setDirectMode(boolean directMode) {
-        this.directMode = directMode;
-    }
-
-    /**
-     * Sets message delimiter.
-     *
-     * @param delim Delimiter.
-     */
-    public void setDelimiter(byte[] delim) {
-        this.delim = delim;
-    }
-
-    /**
-     * Sets message converter.
-     *
-     * @param converter Converter.
-     */
-    public void setConverter(SocketMessageConverter<T> converter) {
-        this.converter = converter;
-    }
-
-    /**
-     * Starts streamer.
-     *
-     * @throws IgniteException If failed.
-     */
-    public void start() {
-        A.notNull(getTupleExtractor(), "tupleExtractor");
-        A.notNull(getStreamer(), "streamer");
-        A.notNull(getIgnite(), "ignite");
-        A.ensure(threads > 0, "threads > 0");
-
-        log = getIgnite().log();
-
-        GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
-            @Override public void onConnected(GridNioSession ses) {
-                assert ses.accepted();
-
-                if (log.isDebugEnabled())
-                    log.debug("Accepted connection: " + ses.remoteAddress());
-            }
-
-            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
-                if (e != null)
-                    log.error("Connection failed with exception", e);
-            }
-
-            @Override public void onMessage(GridNioSession ses, byte[] msg) {
-                addMessage(converter.convert(msg));
-            }
-        };
-
-        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
-        GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
-            new GridDelimitedParser(delim, directMode);
-
-        if (converter == null)
-            converter = new DefaultConverter<>();
-
-        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
-
-        GridNioFilter[] filters = new GridNioFilter[] {codec};
-
-        try {
-            srv = new GridNioServer.Builder<byte[]>()
-                .address(addr == null ? InetAddress.getLocalHost() : addr)
-                .port(port)
-                .listener(lsnr)
-                .logger(log)
-                .selectorCount(threads)
-                .byteOrder(byteOrder)
-                .filters(filters)
-                .build();
-        }
-        catch (IgniteCheckedException | UnknownHostException e) {
-            throw new IgniteException(e);
-        }
-
-        srv.start();
-
-        if (log.isDebugEnabled())
-            log.debug("Socket streaming server started on " + addr + ':' + port);
-    }
-
-    /**
-     * Stops streamer.
-     */
-    public void stop() {
-        srv.stop();
-
-        if (log.isDebugEnabled())
-            log.debug("Socket streaming server stopped");
-    }
-
-    /**
-     * Converts message to Java object using Jdk marshaller.
-     */
-    private static class DefaultConverter<T> implements SocketMessageConverter<T> {
-        /** Marshaller. */
-        private static final JdkMarshaller MARSH = new JdkMarshaller();
-
-        /** {@inheritDoc} */
-        @Override public T convert(byte[] msg) {
-            try {
-                return MARSH.unmarshal(msg, null);
-            }
-            catch (IgniteCheckedException e) {
-                throw new IgniteException(e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
new file mode 100644
index 0000000..07ce77e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.*;
+
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+    /** Default threads. */
+    private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+    /** Logger. */
+    private IgniteLogger log;
+
+    /** Address. */
+    private InetAddress addr;
+
+    /** Server port. */
+    private int port;
+
+    /** Threads number. */
+    private int threads = DFLT_THREADS;
+
+    /** Direct mode. */
+    private boolean directMode;
+
+    /** Delimiter. */
+    private byte[] delim;
+
+    /** Converter. */
+    private SocketMessageConverter<T> converter;
+
+    /** Server. */
+    private GridNioServer<byte[]> srv;
+
+    /**
+     * Sets server address.
+     *
+     * @param addr Address.
+     */
+    public void setAddr(InetAddress addr) {
+        this.addr = addr;
+    }
+
+    /**
+     * Sets port number.
+     *
+     * @param port Port.
+     */
+    public void setPort(int port) {
+        this.port = port;
+    }
+
+    /**
+     * Sets threadds amount.
+     *
+     * @param threads Threads.
+     */
+    public void setThreads(int threads) {
+        this.threads = threads;
+    }
+
+    /**
+     * Sets direct mode flag.
+     *
+     * @param directMode Direct mode.
+     */
+    public void setDirectMode(boolean directMode) {
+        this.directMode = directMode;
+    }
+
+    /**
+     * Sets message delimiter.
+     *
+     * @param delim Delimiter.
+     */
+    public void setDelimiter(byte[] delim) {
+        this.delim = delim;
+    }
+
+    /**
+     * Sets message converter.
+     *
+     * @param converter Converter.
+     */
+    public void setConverter(SocketMessageConverter<T> converter) {
+        this.converter = converter;
+    }
+
+    /**
+     * Starts streamer.
+     *
+     * @throws IgniteException If failed.
+     */
+    public void start() {
+        A.notNull(getTupleExtractor(), "tupleExtractor");
+        A.notNull(getStreamer(), "streamer");
+        A.notNull(getIgnite(), "ignite");
+        A.ensure(threads > 0, "threads > 0");
+
+        log = getIgnite().log();
+
+        GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+            @Override public void onConnected(GridNioSession ses) {
+                assert ses.accepted();
+
+                if (log.isDebugEnabled())
+                    log.debug("Accepted connection: " + ses.remoteAddress());
+            }
+
+            @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+                if (e != null)
+                    log.error("Connection failed with exception", e);
+            }
+
+            @Override public void onMessage(GridNioSession ses, byte[] msg) {
+                addMessage(converter.convert(msg));
+            }
+        };
+
+        ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+        GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+            new GridDelimitedParser(delim, directMode);
+
+        if (converter == null)
+            converter = new DefaultConverter<>();
+
+        GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+        GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+        try {
+            srv = new GridNioServer.Builder<byte[]>()
+                .address(addr == null ? InetAddress.getLocalHost() : addr)
+                .port(port)
+                .listener(lsnr)
+                .logger(log)
+                .selectorCount(threads)
+                .byteOrder(byteOrder)
+                .filters(filters)
+                .build();
+        }
+        catch (IgniteCheckedException | UnknownHostException e) {
+            throw new IgniteException(e);
+        }
+
+        srv.start();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server started on " + addr + ':' + port);
+    }
+
+    /**
+     * Stops streamer.
+     */
+    public void stop() {
+        srv.stop();
+
+        if (log.isDebugEnabled())
+            log.debug("Socket streaming server stopped");
+    }
+
+    /**
+     * Converts message to Java object using Jdk marshaller.
+     */
+    private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+        /** Marshaller. */
+        private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+        /** {@inheritDoc} */
+        @Override public T convert(byte[] msg) {
+            try {
+                return MARSH.unmarshal(msg, null);
+            }
+            catch (IgniteCheckedException e) {
+                throw new IgniteException(e);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
deleted file mode 100644
index 19852ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Tests {@link IgniteSocketStreamer}.
- */
-public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest {
-    /** IP finder. */
-    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
-    /** Grid count. */
-    private final static int GRID_CNT = 3;
-
-    /** Count. */
-    private static final int CNT = 500;
-
-    /** Delimiter. */
-    private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
-
-    /** Port. */
-    private static int port;
-
-    /** Ignite. */
-    private static Ignite ignite;
-
-    /** {@inheritDoc} */
-    @Override protected IgniteConfiguration getConfiguration() throws Exception {
-        IgniteConfiguration cfg = super.getConfiguration();
-
-        CacheConfiguration ccfg = cacheConfiguration(cfg, null);
-
-        cfg.setCacheConfiguration(ccfg);
-
-        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
-        discoSpi.setIpFinder(IP_FINDER);
-
-        cfg.setDiscoverySpi(discoSpi);
-
-        return cfg;
-    }
-
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTestsStarted() throws Exception {
-        ignite = startGrids(GRID_CNT);
-        ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
-
-        try (ServerSocket sock = new ServerSocket(0)) {
-            port = sock.getLocalPort();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void afterTestsStopped() throws Exception {
-        stopAllGrids();
-    }
-
-    /** {@inheritDoc} */
-    @Override protected void beforeTest() throws Exception {
-        ignite.cache(null).clear();
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSizeBasedDefaultConverter() throws Exception {
-        test(null, null, new Runnable() {
-            @Override public void run() {
-                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
-                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-                    Marshaller marsh = new JdkMarshaller();
-
-                    for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
-
-                        os.write(msg.length >>> 24);
-                        os.write(msg.length >>> 16);
-                        os.write(msg.length >>> 8);
-                        os.write(msg.length);
-
-                        os.write(msg);
-                    }
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testSizeBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
-                int i = (msg[0] & 0xFF) << 24;
-                i |= (msg[1] & 0xFF) << 16;
-                i |= (msg[2] & 0xFF) << 8;
-                i |= msg[3] & 0xFF;
-
-                return new Tuple(i);
-            }
-        };
-
-        test(converter, null, new Runnable() {
-            @Override public void run() {
-                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
-                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
-                    for (int i = 0; i < CNT; i++) {
-                        os.write(0);
-                        os.write(0);
-                        os.write(0);
-                        os.write(4);
-
-                        os.write(i >>> 24);
-                        os.write(i >>> 16);
-                        os.write(i >>> 8);
-                        os.write(i);
-                    }
-                }
-                catch (IOException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testDelimiterBasedDefaultConverter() throws Exception {
-        test(null, DELIM, new Runnable() {
-            @Override public void run() {
-                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
-                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-                    Marshaller marsh = new JdkMarshaller();
-
-                    for (int i = 0; i < CNT; i++) {
-                        byte[] msg = marsh.marshal(new Tuple(i));
-
-                        os.write(msg);
-                        os.write(DELIM);
-                    }
-                }
-                catch (IOException | IgniteCheckedException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        });
-
-    }
-
-    /**
-     * @throws Exception If failed.
-     */
-    public void testDelimiterBasedCustomConverter() throws Exception {
-        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
-            @Override public Tuple convert(byte[] msg) {
-                int i = (msg[0] & 0xFF) << 24;
-                i |= (msg[1] & 0xFF) << 16;
-                i |= (msg[2] & 0xFF) << 8;
-                i |= msg[3] & 0xFF;
-
-                return new Tuple(i);
-            }
-        };
-
-        test(converter, DELIM, new Runnable() {
-            @Override public void run() {
-                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
-                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
-                    for (int i = 0; i < CNT; i++) {
-                        os.write(i >>> 24);
-                        os.write(i >>> 16);
-                        os.write(i >>> 8);
-                        os.write(i);
-
-                        os.write(DELIM);
-                    }
-                }
-                catch (IOException e) {
-                    throw new IgniteException(e);
-                }
-            }
-        });
-    }
-
-    /**
-     * @param converter Converter.
-     * @param r Runnable..
-     */
-    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
-    {
-        IgniteSocketStreamer<Tuple, Integer, String> sockStmr = null;
-
-        try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
-
-            stmr.allowOverwrite(true);
-            stmr.autoFlushFrequency(10);
-
-            sockStmr = new IgniteSocketStreamer<>();
-
-            IgniteCache<Integer, String> cache = ignite.cache(null);
-
-            sockStmr.setIgnite(ignite);
-
-            sockStmr.setStreamer(stmr);
-
-            sockStmr.setPort(port);
-
-            sockStmr.setDelimiter(delim);
-
-            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
-                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
-                    return new IgniteBiTuple<>(msg.key, msg.val);
-                }
-            });
-
-            if (converter != null)
-                sockStmr.setConverter(converter);
-
-            final CountDownLatch latch = new CountDownLatch(CNT);
-
-            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
-                @Override public boolean apply(UUID uuid, CacheEvent evt) {
-                    latch.countDown();
-
-                    return true;
-                }
-            };
-
-            ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
-
-            sockStmr.start();
-
-            r.run();
-
-            latch.await();
-
-            assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
-
-            for (int i = 0; i < CNT; i++)
-                assertEquals(Integer.toString(i), cache.get(i));
-        }
-        finally {
-            if (sockStmr != null)
-                sockStmr.stop();
-        }
-
-    }
-
-    /**
-     * Tuple.
-     */
-    private static class Tuple implements Serializable {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0L;
-
-        /** Key. */
-        private final int key;
-
-        /** Value. */
-        private final String val;
-
-        /**
-         * @param key Key.
-         */
-        Tuple(int key) {
-            this.key = key;
-            this.val = Integer.toString(key);
-        }
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
new file mode 100644
index 0000000..752e43c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link SocketStreamer}.
+ */
+public class SocketStreamerSelfTest extends GridCommonAbstractTest {
+    /** IP finder. */
+    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+    /** Grid count. */
+    private final static int GRID_CNT = 3;
+
+    /** Count. */
+    private static final int CNT = 500;
+
+    /** Delimiter. */
+    private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
+
+    /** Port. */
+    private static int port;
+
+    /** Ignite. */
+    private static Ignite ignite;
+
+    /** {@inheritDoc} */
+    @Override protected IgniteConfiguration getConfiguration() throws Exception {
+        IgniteConfiguration cfg = super.getConfiguration();
+
+        CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+
+        cfg.setCacheConfiguration(ccfg);
+
+        TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+        discoSpi.setIpFinder(IP_FINDER);
+
+        cfg.setDiscoverySpi(discoSpi);
+
+        return cfg;
+    }
+
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTestsStarted() throws Exception {
+        ignite = startGrids(GRID_CNT);
+        ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+        try (ServerSocket sock = new ServerSocket(0)) {
+            port = sock.getLocalPort();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void afterTestsStopped() throws Exception {
+        stopAllGrids();
+    }
+
+    /** {@inheritDoc} */
+    @Override protected void beforeTest() throws Exception {
+        ignite.cache(null).clear();
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSizeBasedDefaultConverter() throws Exception {
+        test(null, null, new Runnable() {
+            @Override public void run() {
+                try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                     OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    for (int i = 0; i < CNT; i++) {
+                        byte[] msg = marsh.marshal(new Tuple(i));
+
+                        os.write(msg.length >>> 24);
+                        os.write(msg.length >>> 16);
+                        os.write(msg.length >>> 8);
+                        os.write(msg.length);
+
+                        os.write(msg);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testSizeBasedCustomConverter() throws Exception {
+        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+            @Override public Tuple convert(byte[] msg) {
+                int i = (msg[0] & 0xFF) << 24;
+                i |= (msg[1] & 0xFF) << 16;
+                i |= (msg[2] & 0xFF) << 8;
+                i |= msg[3] & 0xFF;
+
+                return new Tuple(i);
+            }
+        };
+
+        test(converter, null, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+                    for (int i = 0; i < CNT; i++) {
+                        os.write(0);
+                        os.write(0);
+                        os.write(0);
+                        os.write(4);
+
+                        os.write(i >>> 24);
+                        os.write(i >>> 16);
+                        os.write(i >>> 8);
+                        os.write(i);
+                    }
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDelimiterBasedDefaultConverter() throws Exception {
+        test(null, DELIM, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+                    Marshaller marsh = new JdkMarshaller();
+
+                    for (int i = 0; i < CNT; i++) {
+                        byte[] msg = marsh.marshal(new Tuple(i));
+
+                        os.write(msg);
+                        os.write(DELIM);
+                    }
+                }
+                catch (IOException | IgniteCheckedException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+
+    }
+
+    /**
+     * @throws Exception If failed.
+     */
+    public void testDelimiterBasedCustomConverter() throws Exception {
+        SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+            @Override public Tuple convert(byte[] msg) {
+                int i = (msg[0] & 0xFF) << 24;
+                i |= (msg[1] & 0xFF) << 16;
+                i |= (msg[2] & 0xFF) << 8;
+                i |= msg[3] & 0xFF;
+
+                return new Tuple(i);
+            }
+        };
+
+        test(converter, DELIM, new Runnable() {
+            @Override public void run() {
+                try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+                    OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+                    for (int i = 0; i < CNT; i++) {
+                        os.write(i >>> 24);
+                        os.write(i >>> 16);
+                        os.write(i >>> 8);
+                        os.write(i);
+
+                        os.write(DELIM);
+                    }
+                }
+                catch (IOException e) {
+                    throw new IgniteException(e);
+                }
+            }
+        });
+    }
+
+    /**
+     * @param converter Converter.
+     * @param r Runnable..
+     */
+    private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+    {
+        SocketStreamer<Tuple, Integer, String> sockStmr = null;
+
+        try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
+
+            stmr.allowOverwrite(true);
+            stmr.autoFlushFrequency(10);
+
+            sockStmr = new SocketStreamer<>();
+
+            IgniteCache<Integer, String> cache = ignite.cache(null);
+
+            sockStmr.setIgnite(ignite);
+
+            sockStmr.setStreamer(stmr);
+
+            sockStmr.setPort(port);
+
+            sockStmr.setDelimiter(delim);
+
+            sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
+                @Override public Map.Entry<Integer, String> extract(Tuple msg) {
+                    return new IgniteBiTuple<>(msg.key, msg.val);
+                }
+            });
+
+            if (converter != null)
+                sockStmr.setConverter(converter);
+
+            final CountDownLatch latch = new CountDownLatch(CNT);
+
+            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+                @Override public boolean apply(UUID uuid, CacheEvent evt) {
+                    latch.countDown();
+
+                    return true;
+                }
+            };
+
+            ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+            sockStmr.start();
+
+            r.run();
+
+            latch.await();
+
+            assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+
+            for (int i = 0; i < CNT; i++)
+                assertEquals(Integer.toString(i), cache.get(i));
+        }
+        finally {
+            if (sockStmr != null)
+                sockStmr.stop();
+        }
+
+    }
+
+    /**
+     * Tuple.
+     */
+    private static class Tuple implements Serializable {
+        /** Serial version uid. */
+        private static final long serialVersionUID = 0L;
+
+        /** Key. */
+        private final int key;
+
+        /** Value. */
+        private final String val;
+
+        /**
+         * @param key Key.
+         */
+        Tuple(int key) {
+            this.key = key;
+            this.val = Integer.toString(key);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
index 87bbfbb..61be976 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
@@ -32,7 +32,7 @@ public class IgniteStreamTestSuite extends TestSuite {
     public static TestSuite suite() throws Exception {
         TestSuite suite = new TestSuite("Ignite Stream Test Suite");
 
-        suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class));
+        suite.addTest(new TestSuite(SocketStreamerSelfTest.class));
 
         return suite;
     }


[04/21] incubator-ignite git commit: Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4

Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c30fba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c30fba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c30fba4

Branch: refs/heads/ignite-866
Commit: 9c30fba4d817c1ac8b16351b29163a5eaec6d7d5
Parents: a8a0062 478cc7b
Author: avinogradov <av...@gridgain.com>
Authored: Tue May 12 20:07:39 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue May 12 20:07:39 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java    | 2 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java     | 8 ++++----
 2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------



[07/21] incubator-ignite git commit: # ignite-669 - streaming design.

Posted by sb...@apache.org.
# ignite-669 - streaming design.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0cbe3c68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0cbe3c68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0cbe3c68

Branch: refs/heads/ignite-866
Commit: 0cbe3c68dcea09a39fd0e97909bbbaa6b1091a6b
Parents: 56e67e8
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 02:45:48 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:15 2015 +0300

----------------------------------------------------------------------
 .../ignite/stream/adapters/StreamAdapter.java   | 73 ++++++++++++++++++++
 .../stream/adapters/StreamTupleExtractor.java   | 33 +++++++++
 .../ignite/stream/adapters/package-info.java    | 21 ++++++
 3 files changed, 127 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
new file mode 100644
index 0000000..02ae795
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.adapters;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+    /** Tuple extractor. */
+    private final StreamTupleExtractor<T, K, V> extractor;
+
+    /** Streamer. */
+    private final IgniteDataStreamer<K, V> stmr;
+
+    /**
+     * Stream adapter.
+     *
+     * @param stmr Streamer.
+     * @param extractor Tuple extractor.
+     */
+    protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+        this.stmr = stmr;
+        this.extractor = extractor;
+    }
+
+    /**
+     * @return Provided data streamer.
+     */
+    public IgniteDataStreamer<K, V> streamer() {
+        return stmr;
+    }
+
+    /**
+     * @return Provided tuple extractor.
+     */
+    public StreamTupleExtractor<T, K, V> converter() {
+        return extractor;
+    }
+
+    /**
+     * Converts given message to a tuple and adds it to the underlying streamer.
+     *
+     * @param msg Message to convert.
+     */
+    protected void addMessage(T msg) {
+        Map.Entry<K, V> e = extractor.extract(msg);
+
+        if (e != null)
+            stmr.addData(e);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
new file mode 100644
index 0000000..9b0c395
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.adapters;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+    /**
+     * Extracts a key-value tuple from a message.
+     *
+     * @param msg Message.
+     * @return Key-value tuple.
+     */
+    public Map.Entry<K, V> extract(T msg);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
new file mode 100644
index 0000000..a69ffc0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains Ignite stream adapters.
+ */
+package org.apache.ignite.stream.adapters;


[16/21] incubator-ignite git commit: Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-sprint-5

Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-sprint-5


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/04774b5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/04774b5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/04774b5f

Branch: refs/heads/ignite-866
Commit: 04774b5f2e2df2f83bef66d1a1c686662ebee04d
Parents: 896b426 9c30fba
Author: avinogradov <av...@gridgain.com>
Authored: Fri May 15 12:12:02 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Fri May 15 12:12:02 2015 +0300

----------------------------------------------------------------------
 .../ignite/spi/communication/tcp/TcpCommunicationSpi.java    | 2 +-
 .../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java     | 8 ++++----
 pom.xml                                                      | 2 ++
 3 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04774b5f/pom.xml
----------------------------------------------------------------------


[08/21] incubator-ignite git commit: # ignite-669 - streaming design.

Posted by sb...@apache.org.
# ignite-669 - streaming design.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be64e1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be64e1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be64e1dd

Branch: refs/heads/ignite-866
Commit: be64e1dd1f7aba02b664c4be6f2753cdafbbdba6
Parents: 0cbe3c6
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:10:54 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:18 2015 +0300

----------------------------------------------------------------------
 .../ignite/stream/adapters/StreamAdapter.java   | 29 +++++++++++++++++---
 1 file changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be64e1dd/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index 02ae795..f2e0da9 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -29,10 +29,17 @@ import java.util.*;
  */
 public abstract class StreamAdapter<T, K, V> {
     /** Tuple extractor. */
-    private final StreamTupleExtractor<T, K, V> extractor;
+    private StreamTupleExtractor<T, K, V> extractor;
 
     /** Streamer. */
-    private final IgniteDataStreamer<K, V> stmr;
+    private IgniteDataStreamer<K, V> stmr;
+
+    /**
+     * Empty constructor.
+     */
+    public StreamAdapter() {
+        // No-op.
+    }
 
     /**
      * Stream adapter.
@@ -48,18 +55,32 @@ public abstract class StreamAdapter<T, K, V> {
     /**
      * @return Provided data streamer.
      */
-    public IgniteDataStreamer<K, V> streamer() {
+    public IgniteDataStreamer<K, V> getStreamer() {
         return stmr;
     }
 
     /**
+     * @param stmr Ignite data streamer.
+     */
+    public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+        this.stmr = stmr;
+    }
+
+    /**
      * @return Provided tuple extractor.
      */
-    public StreamTupleExtractor<T, K, V> converter() {
+    public StreamTupleExtractor<T, K, V> getConverter() {
         return extractor;
     }
 
     /**
+     * @param extractor Extractor for key-value tuples from messages.
+     */
+    public void setExtractor(StreamTupleExtractor<T, K, V> extractor) {
+        this.extractor = extractor;
+    }
+
+    /**
      * Converts given message to a tuple and adds it to the underlying streamer.
      *
      * @param msg Message to convert.


[17/21] incubator-ignite git commit: # IGNITE-904: Implemented interop start routine.

Posted by sb...@apache.org.
# IGNITE-904: Implemented interop start routine.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15aa1cfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15aa1cfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15aa1cfa

Branch: refs/heads/ignite-866
Commit: 15aa1cfac4bd30c1d613c941d5afc994f34aa8e3
Parents: 04774b5f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri May 15 17:28:34 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri May 15 17:28:34 2015 +0300

----------------------------------------------------------------------
 .../org/apache/ignite/internal/IgnitionEx.java  | 136 ++++++-------------
 .../internal/interop/InteropBootstrap.java      |  34 +++++
 .../interop/InteropBootstrapFactory.java        |  39 ++++++
 .../internal/interop/InteropIgnition.java       | 103 ++++++++++++++
 .../internal/interop/InteropProcessor.java      |  25 ++++
 5 files changed, 240 insertions(+), 97 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8d88677..d54e06f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -532,22 +532,6 @@ public class IgnitionEx {
     }
 
     /**
-     * Start Grid passing a closure which will modify configuration before it is passed to start routine.
-     *
-     * @param springCfgPath Spring config path.
-     * @param gridName Grid name.
-     * @param cfgClo Configuration closure.
-     * @return Started Grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    public static Ignite startWithClosure(@Nullable String springCfgPath, @Nullable String gridName,
-        IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo) throws IgniteCheckedException {
-        URL url = U.resolveSpringUrl(springCfgPath);
-
-        return start(url, gridName, null, cfgClo);
-    }
-
-    /**
      * Loads all grid configurations specified within given Spring XML configuration file.
      * <p>
      * Usually Spring XML configuration file will contain only one Grid definition. Note that
@@ -734,7 +718,40 @@ public class IgnitionEx {
      */
     public static Ignite start(URL springCfgUrl, @Nullable String gridName,
         @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
-        return start(springCfgUrl, gridName, springCtx, null);
+        A.notNull(springCfgUrl, "springCfgUrl");
+
+        boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
+
+        IgniteBiTuple<Object, Object> t = null;
+
+        if (isLog4jUsed) {
+            try {
+                t = U.addLog4jNoOpLogger();
+            }
+            catch (IgniteCheckedException ignore) {
+                isLog4jUsed = false;
+            }
+        }
+
+        Collection<Handler> savedHnds = null;
+
+        if (!isLog4jUsed)
+            savedHnds = U.addJavaNoOpLogger();
+
+        IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap;
+
+        try {
+            cfgMap = loadConfigurations(springCfgUrl);
+        }
+        finally {
+            if (isLog4jUsed && t != null)
+                U.removeLog4jNoOpLogger(t);
+
+            if (!isLog4jUsed)
+                U.removeJavaNoOpLogger(savedHnds);
+        }
+
+        return startConfigurations(cfgMap, springCfgUrl, gridName, springCtx);
     }
 
     /**
@@ -780,73 +797,6 @@ public class IgnitionEx {
      */
     public static Ignite start(InputStream springCfgStream, @Nullable String gridName,
         @Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
-        return start(springCfgStream, gridName, springCtx, null);
-    }
-
-    /**
-     * Internal Spring-based start routine.
-     *
-     * @param springCfgUrl Spring XML configuration file URL. This cannot be {@code null}.
-     * @param gridName Grid name that will override default.
-     * @param springCtx Optional Spring application context.
-     * @param cfgClo Optional closure to change configuration before it is used to start the grid.
-     * @return Started grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    private static Ignite start(final URL springCfgUrl, @Nullable String gridName,
-        @Nullable GridSpringResourceContext springCtx,
-        @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
-        throws IgniteCheckedException {
-        A.notNull(springCfgUrl, "springCfgUrl");
-
-        boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
-
-        IgniteBiTuple<Object, Object> t = null;
-
-        if (isLog4jUsed) {
-            try {
-                t = U.addLog4jNoOpLogger();
-            }
-            catch (IgniteCheckedException ignore) {
-                isLog4jUsed = false;
-            }
-        }
-
-        Collection<Handler> savedHnds = null;
-
-        if (!isLog4jUsed)
-            savedHnds = U.addJavaNoOpLogger();
-
-        IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap;
-
-        try {
-            cfgMap = loadConfigurations(springCfgUrl);
-        }
-        finally {
-            if (isLog4jUsed && t != null)
-                U.removeLog4jNoOpLogger(t);
-
-            if (!isLog4jUsed)
-                U.removeJavaNoOpLogger(savedHnds);
-        }
-
-        return startConfigurations(cfgMap, springCfgUrl, gridName, springCtx, cfgClo);
-    }
-
-    /**
-     * Internal Spring-based start routine.
-     *
-     * @param springCfgStream Input stream containing Spring XML configuration. This cannot be {@code null}.
-     * @param gridName Grid name that will override default.
-     * @param springCtx Optional Spring application context.
-     * @param cfgClo Optional closure to change configuration before it is used to start the grid.
-     * @return Started grid.
-     * @throws IgniteCheckedException If failed.
-     */
-    private static Ignite start(final InputStream springCfgStream, @Nullable String gridName,
-        @Nullable GridSpringResourceContext springCtx,
-        @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
-        throws IgniteCheckedException {
         A.notNull(springCfgStream, "springCfgUrl");
 
         boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
@@ -880,7 +830,7 @@ public class IgnitionEx {
                 U.removeJavaNoOpLogger(savedHnds);
         }
 
-        return startConfigurations(cfgMap, null, gridName, springCtx, cfgClo);
+        return startConfigurations(cfgMap, null, gridName, springCtx);
     }
 
     /**
@@ -890,7 +840,6 @@ public class IgnitionEx {
      * @param springCfgUrl Spring XML configuration file URL.
      * @param gridName Grid name that will override default.
      * @param springCtx Optional Spring application context.
-     * @param cfgClo Optional closure to change configuration before it is used to start the grid.
      * @return Started grid.
      * @throws IgniteCheckedException If failed.
      */
@@ -898,8 +847,7 @@ public class IgnitionEx {
         IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap,
         URL springCfgUrl,
         @Nullable String gridName,
-        @Nullable GridSpringResourceContext springCtx,
-        @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
+        @Nullable GridSpringResourceContext springCtx)
         throws IgniteCheckedException {
         List<IgniteNamedInstance> grids = new ArrayList<>(cfgMap.size());
 
@@ -910,12 +858,6 @@ public class IgnitionEx {
                 if (cfg.getGridName() == null && !F.isEmpty(gridName))
                     cfg.setGridName(gridName);
 
-                if (cfgClo != null) {
-                    cfg = cfgClo.apply(cfg);
-
-                    assert cfg != null;
-                }
-
                 // Use either user defined context or our one.
                 IgniteNamedInstance grid = start0(
                     new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx));
@@ -1600,9 +1542,9 @@ public class IgnitionEx {
                     igfsExecSvc, restExecSvc,
                     new CA() {
                         @Override public void apply() {
-                        startLatch.countDown();
-                    }
-                });
+                            startLatch.countDown();
+                        }
+                    });
 
                 state = STARTED;
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
new file mode 100644
index 0000000..820bef9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Interop bootstrap. Responsible for starting Ignite node in interop mode.
+ */
+public interface InteropBootstrap {
+    /**
+     * Start Ignite node.
+     *
+     * @param cfg Configuration.
+     * @param envPtr Environment pointer.
+     * @return Ignite node.
+     */
+    public InteropProcessor start(IgniteConfiguration cfg, long envPtr);
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
new file mode 100644
index 0000000..b61ca89
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import java.io.*;
+
+/**
+ * Interop bootstrap factory.
+ */
+public interface InteropBootstrapFactory extends Serializable {
+    /**
+     * Get bootstrap factory ID.
+     *
+     * @return ID.
+     */
+    public int id();
+
+    /**
+     * Create bootstrap instance.
+     *
+     * @return Bootstrap instance.
+     */
+    public InteropBootstrap create();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
new file mode 100644
index 0000000..f245122
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.security.*;
+import java.util.*;
+
+/**
+ * Entry point for interop nodes.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class InteropIgnition {
+    /**
+     * Start Ignite node in interop mode.
+     *
+     * @param springCfgPath Spring configuration path.
+     * @param gridName Grid name.
+     * @param factoryId Factory ID.
+     * @param envPtr Environment pointer.
+     * @return Ignite instance.
+     */
+    public static InteropProcessor start(@Nullable String springCfgPath, @Nullable String gridName, int factoryId,
+        long envPtr) {
+        IgniteConfiguration cfg = configuration(springCfgPath);
+
+        if (gridName != null)
+            cfg.setGridName(gridName);
+
+        InteropBootstrap bootstrap = bootstrap(factoryId);
+
+        return bootstrap.start(cfg, envPtr);
+    }
+
+    private static IgniteConfiguration configuration(@Nullable String springCfgPath) {
+        try {
+            URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) :
+                U.resolveSpringUrl(springCfgPath);
+
+            IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url);
+
+            return t.get1();
+        }
+        catch (IgniteCheckedException e) {
+            throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e);
+        }
+    }
+
+    /**
+     * Create bootstrap for the given factory ID.
+     *
+     * @param factoryId Factory ID.
+     * @return Bootstrap.
+     */
+    private static InteropBootstrap bootstrap(final int factoryId) {
+        InteropBootstrapFactory factory = AccessController.doPrivileged(
+            new PrivilegedAction<InteropBootstrapFactory>() {
+            @Override public InteropBootstrapFactory run() {
+                for (InteropBootstrapFactory factory : ServiceLoader.load(InteropBootstrapFactory.class)) {
+                    if (factory.id() == factoryId)
+                        return factory;
+                }
+
+                return null;
+            }
+        });
+
+        if (factory == null)
+            throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId);
+
+        return factory.create();
+    }
+
+    /**
+     * Private constructor.
+     */
+    private InteropIgnition() {
+        // No-op.
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
new file mode 100644
index 0000000..6c55296
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+/**
+ * Interop processor.
+ */
+public interface InteropProcessor {
+    // No-op.
+}