You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by an...@apache.org on 2015/05/15 05:03:51 UTC

[46/50] incubator-ignite git commit: ignite-430 Words count Socket streamer examples

ignite-430 Words count Socket streamer examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d87efce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d87efce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d87efce0

Branch: refs/heads/ignite-843
Commit: d87efce0af1ad269ea1a4182329f441b6ff32122
Parents: 53995dc
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 18:38:48 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:31 2015 +0300

----------------------------------------------------------------------
 .../streaming/socket/SocketStreamerExample.java | 112 +++++++------------
 .../socket/ZStringsSocketStreamerExample.java   |  76 ++++++-------
 .../examples/streaming/socket/package-info.java |   1 +
 3 files changed, 75 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
index 73cb970..487572a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -18,36 +18,30 @@
 package org.apache.ignite.examples.streaming.socket;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
 import org.apache.ignite.stream.adapters.*;
 import org.apache.ignite.stream.socket.*;
 
-import javax.cache.processor.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
 
 /**
- * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
+ * <p>
  * To start the example, you should:
  * <ul>
- *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *      <li>Start streaming using {@link SocketStreamerExample}.</li>
- *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link SocketStreamerExample}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class SocketStreamerExample {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
     /** Port. */
     private static final int PORT = 5555;
 
@@ -63,27 +57,13 @@ public class SocketStreamerExample {
                 return;
 
             // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the same word.
-                stmr.receiver(new StreamTransformer<Integer, Long>() {
-                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
-                        throws EntryProcessorException {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
+            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
 
+            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
                 InetAddress addr = InetAddress.getLocalHost();
 
-                IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+                // Configure socket streamer
+                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
 
                 sockStmr.setAddr(addr);
 
@@ -93,9 +73,11 @@ public class SocketStreamerExample {
 
                 sockStmr.setStreamer(stmr);
 
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() {
-                    @Override public Map.Entry<Integer, Long> extract(Tuple tuple) {
-                        return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                        // By using AffinityUuid we ensure that identical
+                        // words are processed on the same cluster node.
+                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
                     }
                 });
 
@@ -114,45 +96,33 @@ public class SocketStreamerExample {
         try (Socket sock = new Socket(addr, port);
              OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
             while (true) {
-                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
-                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
-                    Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
-
-                    out.writeObject(tuple);
-
-                    byte[] arr = bos.toByteArray();
-
-                    oos.write(arr.length >>> 24);
-                    oos.write(arr.length >>> 16);
-                    oos.write(arr.length >>> 8);
-                    oos.write(arr.length);
-
-                    oos.write(arr);
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+                                     ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+                                    // Write message
+                                    out.writeObject(word);
+
+                                    byte[] arr = bos.toByteArray();
+
+                                    // Write message length
+                                    oos.write(arr.length >>> 24);
+                                    oos.write(arr.length >>> 16);
+                                    oos.write(arr.length >>> 8);
+                                    oos.write(arr.length);
+
+                                    oos.write(arr);
+                                }
+                            }
+                        }
+                    }
                 }
             }
         }
     }
-
-    /**
-     * Tuple.
-     */
-    private static class Tuple implements Serializable {
-        /** Serial version uid. */
-        private static final long serialVersionUID = 0;
-
-        /** Key. */
-        private final int key;
-
-        /** Count. */
-        private final long cnt;
-
-        /**
-         * @param key Key.
-         * @param cnt Count.
-         */
-        public Tuple(int key, long cnt) {
-            this.key = key;
-            this.cnt = cnt;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
index a535c73..fa5aa28 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -18,40 +18,34 @@
 package org.apache.ignite.examples.streaming.socket;
 
 import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
 import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
 import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
 import org.apache.ignite.stream.adapters.*;
 import org.apache.ignite.stream.socket.*;
 
-import javax.cache.processor.*;
 import java.io.*;
 import java.net.*;
 import java.util.*;
 
 /**
- * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
+ * protocol.
  * <p>
  * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
  * zero-terminated strings.
  * <p>
  * To start the example, you should:
  * <ul>
- *      <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- *      <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- *      <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ *     <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ *     <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ *     <li>Start querying popular numbers using {@link QueryWords}.</li>
  * </ul>
  * <p>
  * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
  */
 public class ZStringsSocketStreamerExample {
-    /** Random number generator. */
-    private static final Random RAND = new Random();
-
-    /** Range within which to generate numbers. */
-    private static final int RANGE = 1000;
-
     /** Port. */
     private static final int PORT = 5555;
 
@@ -70,27 +64,13 @@ public class ZStringsSocketStreamerExample {
                 return;
 
             // The cache is configured with sliding window holding 1 second of the streaming data.
-            IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
-            try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
-                // Allow data updates.
-                stmr.allowOverwrite(true);
-
-                // Configure data transformation to count instances of the same word.
-                stmr.receiver(new StreamTransformer<Integer, Long>() {
-                    @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
-                        throws EntryProcessorException {
-                        Long val = e.getValue();
-
-                        e.setValue(val == null ? 1L : val + 1);
-
-                        return null;
-                    }
-                });
+            IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
 
+            try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
                 InetAddress addr = InetAddress.getLocalHost();
 
-                IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+                // Configure socket streamer
+                IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
 
                 sockStmr.setAddr(addr);
 
@@ -114,10 +94,11 @@ public class ZStringsSocketStreamerExample {
                     }
                 });
 
-                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() {
-                    @Override public Map.Entry<Integer, Long> extract(String input) {
-                        String[] pair = input.split("=");
-                        return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
+                sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+                    @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+                        // By using AffinityUuid we ensure that identical
+                        // words are processed on the same cluster node.
+                        return new IgniteBiTuple<>(new AffinityUuid(word), word);
                     }
                 });
 
@@ -137,14 +118,23 @@ public class ZStringsSocketStreamerExample {
              OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
 
             while (true) {
-                int key = RAND.nextInt(RANGE);
-
-                String str = key + "=1";
-
-                byte[] arr = str.getBytes("ASCII");
-
-                oos.write(arr);
-                oos.write(DELIM);
+                try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+                     LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+                    for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+                        for (String word : line.split(" ")) {
+                            if (!word.isEmpty()) {
+                                // Stream words into Ignite through socket.
+                                byte[] arr = word.getBytes("ASCII");
+
+                                // Write message
+                                oos.write(arr);
+
+                                // Write message delimiter
+                                oos.write(DELIM);
+                            }
+                        }
+                    }
+                }
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index ae7bdf9..d0a480a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,6 +16,7 @@
  */
 
 /**
+ * <!-- Package description. -->
  * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
  */
 package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file