You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 10:50:37 UTC
[08/11] incubator-ignite git commit: ignite-430 Words count Socket
streamer examples
ignite-430 Words count Socket streamer examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7ee85179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7ee85179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7ee85179
Branch: refs/heads/ignite-614
Commit: 7ee85179103df637ba594fab68755dee71b69997
Parents: d87efce
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 20:56:22 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:34 2015 +0300
----------------------------------------------------------------------
.../ignite/examples/streaming/package-info.java | 1 -
.../streaming/socket/SocketStreamerExample.java | 128 --------
.../socket/WordsSocketStreamerClient.java | 86 +++++
.../socket/WordsSocketStreamerServer.java | 93 ++++++
.../socket/ZStringsSocketStreamerExample.java | 141 ---------
.../socket/ZWordsSocketStreamerClient.java | 81 +++++
.../socket/ZWordsSocketStreamerServer.java | 111 +++++++
.../examples/streaming/socket/package-info.java | 3 +-
.../streaming/wordcount/CacheConfig.java | 2 +-
.../streaming/wordcount/QueryWords.java | 2 +-
.../streaming/wordcount/StreamWords.java | 2 +-
.../streaming/wordcount/package-info.java | 1 -
.../org/apache/ignite/stream/StreamAdapter.java | 111 +++++++
.../ignite/stream/StreamTupleExtractor.java | 33 ++
.../ignite/stream/adapters/StreamAdapter.java | 111 -------
.../stream/adapters/StreamTupleExtractor.java | 33 --
.../ignite/stream/adapters/package-info.java | 21 --
.../stream/socket/IgniteSocketStreamer.java | 217 -------------
.../ignite/stream/socket/SocketStreamer.java | 218 +++++++++++++
.../socket/IgniteSocketStreamerSelfTest.java | 315 -------------------
.../stream/socket/SocketStreamerSelfTest.java | 315 +++++++++++++++++++
.../testsuites/IgniteStreamTestSuite.java | 2 +-
22 files changed, 1053 insertions(+), 974 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
index 43dea13..43fbab3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
* Demonstrates usage of data streamer.
*/
package org.apache.ignite.examples.streaming;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
deleted file mode 100644
index 487572a..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link SocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class SocketStreamerExample {
- /** Port. */
- private static final int PORT = 5555;
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
- if (!ExamplesUtils.hasServerNodes(ignite))
- return;
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
-
- sendData(addr, PORT);
- }
- }
- }
-
- /**
- * @param addr Address.
- * @param port Port.
- */
- private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
- try (Socket sock = new Socket(addr, port);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
- // Write message
- out.writeObject(word);
-
- byte[] arr = bos.toByteArray();
-
- // Write message length
- oos.write(arr.length >>> 24);
- oos.write(arr.length >>> 16);
- oos.write(arr.length >>> 8);
- oos.write(arr.length);
-
- oos.write(arr);
- }
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
new file mode 100644
index 0000000..c5ec079
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message size based protocol.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerClient {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress addr = InetAddress.getLocalHost();
+
+ try (Socket sock = new Socket(addr, PORT);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+ System.out.println("Words streaming started.");
+
+ while (true) {
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+ // Write message
+ out.writeObject(word);
+
+ byte[] arr = bos.toByteArray();
+
+ // Write message length
+ oos.write(arr.length >>> 24);
+ oos.write(arr.length >>> 16);
+ oos.write(arr.length >>> 8);
+ oos.write(arr.length);
+
+ oos.write(arr);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
new file mode 100644
index 0000000..5af746d
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message size based protocol
+ * and streams them into Ignite cache.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerServer {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+ if (!ExamplesUtils.hasServerNodes(ignite)) {
+ ignite.close();
+
+ return;
+ }
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+ IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ // Configure socket streamer
+ SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
+ }
+ });
+
+ sockStmr.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
deleted file mode 100644
index fa5aa28..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
- * protocol.
- * <p>
- * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
- * zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZStringsSocketStreamerExample {
- /** Port. */
- private static final int PORT = 5555;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0};
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
- if (!ExamplesUtils.hasServerNodes(ignite))
- return;
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setDelimiter(DELIM);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- // Converter from zero-terminated string to Java strings.
- sockStmr.setConverter(new SocketMessageConverter<String>() {
- @Override public String convert(byte[] msg) {
- try {
- return new String(msg, "ASCII");
- }
- catch (UnsupportedEncodingException e) {
- throw new IgniteException(e);
- }
- }
- });
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
-
- sendData(addr, PORT);
- }
- }
- }
-
- /**
- * @param addr Address.
- * @param port Port.
- */
- private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
- try (Socket sock = new Socket(addr, port);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- byte[] arr = word.getBytes("ASCII");
-
- // Write message
- oos.write(arr);
-
- // Write message delimiter
- oos.write(DELIM);
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
new file mode 100644
index 0000000..c17ccdc
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerClient {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress addr = InetAddress.getLocalHost();
+
+ try (Socket sock = new Socket(addr, PORT);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+ System.out.println("Words streaming started.");
+
+ while (true) {
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ byte[] arr = word.getBytes("ASCII");
+
+ // Write message
+ oos.write(arr);
+
+ // Write message delimiter
+ oos.write(DELIM);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
new file mode 100644
index 0000000..a0ef9da
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerServer {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+ if (!ExamplesUtils.hasServerNodes(ignite)) {
+ ignite.close();
+
+ return;
+ }
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+ IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ // Configure socket streamer
+ SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setDelimiter(DELIM);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ // Converter from zero-terminated string to Java strings.
+ sockStmr.setConverter(new SocketMessageConverter<String>() {
+ @Override public String convert(byte[] msg) {
+ try {
+ return new String(msg, "ASCII");
+ }
+ catch (UnsupportedEncodingException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
+ }
+ });
+
+ sockStmr.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index d0a480a..c516ab4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
- * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
+ * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
*/
package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
index 58704ca..d17b97d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
@@ -26,7 +26,7 @@ import javax.cache.expiry.*;
import static java.util.concurrent.TimeUnit.*;
/**
- * Configuration for the streaming cache to store the stream of random numbers.
+ * Configuration for the streaming cache to store the stream of words.
* This cache is configured with sliding window of 1 second, which means that
* data older than 1 second will be automatically removed from the cache.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index 3bd9d3d..149aa79 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -30,7 +30,7 @@ import java.util.*;
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
* <li>Start streaming using {@link StreamWords}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index c59fa51..cc3c0cb 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -29,7 +29,7 @@ import java.io.*;
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
* <li>Start streaming using {@link StreamWords}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
index 010f86a..5d48ae3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
* Streaming word count example.
*/
package org.apache.ignite.examples.streaming.wordcount;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
new file mode 100644
index 0000000..0c4e2d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+ /** Tuple extractor. */
+ private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Streamer. */
+ private IgniteDataStreamer<K, V> stmr;
+
+ /** Ignite. */
+ private Ignite ignite;
+
+ /**
+ * Empty constructor.
+ */
+ protected StreamAdapter() {
+ // No-op.
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor.
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided data streamer.
+ */
+ public IgniteDataStreamer<K, V> getStreamer() {
+ return stmr;
+ }
+
+ /**
+ * @param stmr Ignite data streamer.
+ */
+ public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+ this.stmr = stmr;
+ }
+
+ /**
+ * @return Provided tuple extractor.
+ */
+ public StreamTupleExtractor<T, K, V> getTupleExtractor() {
+ return extractor;
+ }
+
+ /**
+ * @param extractor Extractor for key-value tuples from messages.
+ */
+ public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided {@link Ignite} instance.
+ */
+ public Ignite getIgnite() {
+ return ignite;
+ }
+
+ /**
+ * @param ignite {@link Ignite} instance.
+ */
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
+ /**
+ * Converts given message to a tuple and adds it to the underlying streamer.
+ *
+ * @param msg Message to convert.
+ */
+ protected void addMessage(T msg) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
new file mode 100644
index 0000000..d2a4ede
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
deleted file mode 100644
index b99521a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import org.apache.ignite.*;
-
-import java.util.*;
-
-/**
- * Convenience adapter for streamers. Adapters are optional components for
- * streaming from different data sources. The purpose of adapters is to
- * convert different message formats into Ignite stream key-value tuples
- * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
- */
-public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
-
- /** Streamer. */
- private IgniteDataStreamer<K, V> stmr;
-
- /** Ignite. */
- private Ignite ignite;
-
- /**
- * Empty constructor.
- */
- protected StreamAdapter() {
- // No-op.
- }
-
- /**
- * Stream adapter.
- *
- * @param stmr Streamer.
- * @param extractor Tuple extractor.
- */
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
- this.stmr = stmr;
- this.extractor = extractor;
- }
-
- /**
- * @return Provided data streamer.
- */
- public IgniteDataStreamer<K, V> getStreamer() {
- return stmr;
- }
-
- /**
- * @param stmr Ignite data streamer.
- */
- public void setStreamer(IgniteDataStreamer<K, V> stmr) {
- this.stmr = stmr;
- }
-
- /**
- * @return Provided tuple extractor.
- */
- public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
- }
-
- /**
- * @param extractor Extractor for key-value tuples from messages.
- */
- public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
- }
-
- /**
- * @return Provided {@link Ignite} instance.
- */
- public Ignite getIgnite() {
- return ignite;
- }
-
- /**
- * @param ignite {@link Ignite} instance.
- */
- public void setIgnite(Ignite ignite) {
- this.ignite = ignite;
- }
-
- /**
- * Converts given message to a tuple and adds it to the underlying streamer.
- *
- * @param msg Message to convert.
- */
- protected void addMessage(T msg) {
- Map.Entry<K, V> e = extractor.extract(msg);
-
- if (e != null)
- stmr.addData(e);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
deleted file mode 100644
index 9b0c395..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import java.util.*;
-
-/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- */
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
deleted file mode 100644
index a69ffc0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains Ignite stream adapters.
- */
-package org.apache.ignite.stream.adapters;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
deleted file mode 100644
index 66369ea..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.stream.adapters.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.nio.*;
-
-/**
- * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
- * streams into {@link IgniteDataStreamer} instance.
- * <p>
- * By default server uses size-based message processing. That is every message sent over the socket is prepended with
- * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
- * delimiter-based message processing will be used. That is every message sent over the socket is appended with
- * provided delimiter.
- * <p>
- * Received messages through socket converts to Java object using standard serialization. Conversion functionality
- * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
- * non Java clients).
- */
-public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
- /** Default threads. */
- private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Address. */
- private InetAddress addr;
-
- /** Server port. */
- private int port;
-
- /** Threads number. */
- private int threads = DFLT_THREADS;
-
- /** Direct mode. */
- private boolean directMode;
-
- /** Delimiter. */
- private byte[] delim;
-
- /** Converter. */
- private SocketMessageConverter<T> converter;
-
- /** Server. */
- private GridNioServer<byte[]> srv;
-
- /**
- * Sets server address.
- *
- * @param addr Address.
- */
- public void setAddr(InetAddress addr) {
- this.addr = addr;
- }
-
- /**
- * Sets port number.
- *
- * @param port Port.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Sets threadds amount.
- *
- * @param threads Threads.
- */
- public void setThreads(int threads) {
- this.threads = threads;
- }
-
- /**
- * Sets direct mode flag.
- *
- * @param directMode Direct mode.
- */
- public void setDirectMode(boolean directMode) {
- this.directMode = directMode;
- }
-
- /**
- * Sets message delimiter.
- *
- * @param delim Delimiter.
- */
- public void setDelimiter(byte[] delim) {
- this.delim = delim;
- }
-
- /**
- * Sets message converter.
- *
- * @param converter Converter.
- */
- public void setConverter(SocketMessageConverter<T> converter) {
- this.converter = converter;
- }
-
- /**
- * Starts streamer.
- *
- * @throws IgniteException If failed.
- */
- public void start() {
- A.notNull(getTupleExtractor(), "tupleExtractor");
- A.notNull(getStreamer(), "streamer");
- A.notNull(getIgnite(), "ignite");
- A.ensure(threads > 0, "threads > 0");
-
- log = getIgnite().log();
-
- GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
- @Override public void onConnected(GridNioSession ses) {
- assert ses.accepted();
-
- if (log.isDebugEnabled())
- log.debug("Accepted connection: " + ses.remoteAddress());
- }
-
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (e != null)
- log.error("Connection failed with exception", e);
- }
-
- @Override public void onMessage(GridNioSession ses, byte[] msg) {
- addMessage(converter.convert(msg));
- }
- };
-
- ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
- GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
- new GridDelimitedParser(delim, directMode);
-
- if (converter == null)
- converter = new DefaultConverter<>();
-
- GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
-
- GridNioFilter[] filters = new GridNioFilter[] {codec};
-
- try {
- srv = new GridNioServer.Builder<byte[]>()
- .address(addr == null ? InetAddress.getLocalHost() : addr)
- .port(port)
- .listener(lsnr)
- .logger(log)
- .selectorCount(threads)
- .byteOrder(byteOrder)
- .filters(filters)
- .build();
- }
- catch (IgniteCheckedException | UnknownHostException e) {
- throw new IgniteException(e);
- }
-
- srv.start();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server started on " + addr + ':' + port);
- }
-
- /**
- * Stops streamer.
- */
- public void stop() {
- srv.stop();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server stopped");
- }
-
- /**
- * Converts message to Java object using Jdk marshaller.
- */
- private static class DefaultConverter<T> implements SocketMessageConverter<T> {
- /** Marshaller. */
- private static final JdkMarshaller MARSH = new JdkMarshaller();
-
- /** {@inheritDoc} */
- @Override public T convert(byte[] msg) {
- try {
- return MARSH.unmarshal(msg, null);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
new file mode 100644
index 0000000..07ce77e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.*;
+
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Default threads. */
+ private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Address. */
+ private InetAddress addr;
+
+ /** Server port. */
+ private int port;
+
+ /** Threads number. */
+ private int threads = DFLT_THREADS;
+
+ /** Direct mode. */
+ private boolean directMode;
+
+ /** Delimiter. */
+ private byte[] delim;
+
+ /** Converter. */
+ private SocketMessageConverter<T> converter;
+
+ /** Server. */
+ private GridNioServer<byte[]> srv;
+
+ /**
+ * Sets server address.
+ *
+ * @param addr Address.
+ */
+ public void setAddr(InetAddress addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * Sets port number.
+ *
+ * @param port Port.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets threadds amount.
+ *
+ * @param threads Threads.
+ */
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets direct mode flag.
+ *
+ * @param directMode Direct mode.
+ */
+ public void setDirectMode(boolean directMode) {
+ this.directMode = directMode;
+ }
+
+ /**
+ * Sets message delimiter.
+ *
+ * @param delim Delimiter.
+ */
+ public void setDelimiter(byte[] delim) {
+ this.delim = delim;
+ }
+
+ /**
+ * Sets message converter.
+ *
+ * @param converter Converter.
+ */
+ public void setConverter(SocketMessageConverter<T> converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+ @Override public void onConnected(GridNioSession ses) {
+ assert ses.accepted();
+
+ if (log.isDebugEnabled())
+ log.debug("Accepted connection: " + ses.remoteAddress());
+ }
+
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (e != null)
+ log.error("Connection failed with exception", e);
+ }
+
+ @Override public void onMessage(GridNioSession ses, byte[] msg) {
+ addMessage(converter.convert(msg));
+ }
+ };
+
+ ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+ GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+ new GridDelimitedParser(delim, directMode);
+
+ if (converter == null)
+ converter = new DefaultConverter<>();
+
+ GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+ GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+ try {
+ srv = new GridNioServer.Builder<byte[]>()
+ .address(addr == null ? InetAddress.getLocalHost() : addr)
+ .port(port)
+ .listener(lsnr)
+ .logger(log)
+ .selectorCount(threads)
+ .byteOrder(byteOrder)
+ .filters(filters)
+ .build();
+ }
+ catch (IgniteCheckedException | UnknownHostException e) {
+ throw new IgniteException(e);
+ }
+
+ srv.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server started on " + addr + ':' + port);
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ srv.stop();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server stopped");
+ }
+
+ /**
+ * Converts message to Java object using Jdk marshaller.
+ */
+ private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+ /** Marshaller. */
+ private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public T convert(byte[] msg) {
+ try {
+ return MARSH.unmarshal(msg, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
deleted file mode 100644
index 19852ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Tests {@link IgniteSocketStreamer}.
- */
-public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Grid count. */
- private final static int GRID_CNT = 3;
-
- /** Count. */
- private static final int CNT = 500;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
-
- /** Port. */
- private static int port;
-
- /** Ignite. */
- private static Ignite ignite;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration() throws Exception {
- IgniteConfiguration cfg = super.getConfiguration();
-
- CacheConfiguration ccfg = cacheConfiguration(cfg, null);
-
- cfg.setCacheConfiguration(ccfg);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- return cfg;
- }
-
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- ignite = startGrids(GRID_CNT);
- ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
-
- try (ServerSocket sock = new ServerSocket(0)) {
- port = sock.getLocalPort();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- ignite.cache(null).clear();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSizeBasedDefaultConverter() throws Exception {
- test(null, null, new Runnable() {
- @Override public void run() {
- try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
- Marshaller marsh = new JdkMarshaller();
-
- for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
-
- os.write(msg.length >>> 24);
- os.write(msg.length >>> 16);
- os.write(msg.length >>> 8);
- os.write(msg.length);
-
- os.write(msg);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSizeBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
- int i = (msg[0] & 0xFF) << 24;
- i |= (msg[1] & 0xFF) << 16;
- i |= (msg[2] & 0xFF) << 8;
- i |= msg[3] & 0xFF;
-
- return new Tuple(i);
- }
- };
-
- test(converter, null, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
- for (int i = 0; i < CNT; i++) {
- os.write(0);
- os.write(0);
- os.write(0);
- os.write(4);
-
- os.write(i >>> 24);
- os.write(i >>> 16);
- os.write(i >>> 8);
- os.write(i);
- }
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDelimiterBasedDefaultConverter() throws Exception {
- test(null, DELIM, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
- Marshaller marsh = new JdkMarshaller();
-
- for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
-
- os.write(msg);
- os.write(DELIM);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- });
-
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDelimiterBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
- int i = (msg[0] & 0xFF) << 24;
- i |= (msg[1] & 0xFF) << 16;
- i |= (msg[2] & 0xFF) << 8;
- i |= msg[3] & 0xFF;
-
- return new Tuple(i);
- }
- };
-
- test(converter, DELIM, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
- for (int i = 0; i < CNT; i++) {
- os.write(i >>> 24);
- os.write(i >>> 16);
- os.write(i >>> 8);
- os.write(i);
-
- os.write(DELIM);
- }
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @param converter Converter.
- * @param r Runnable..
- */
- private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
- {
- IgniteSocketStreamer<Tuple, Integer, String> sockStmr = null;
-
- try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
-
- stmr.allowOverwrite(true);
- stmr.autoFlushFrequency(10);
-
- sockStmr = new IgniteSocketStreamer<>();
-
- IgniteCache<Integer, String> cache = ignite.cache(null);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- sockStmr.setPort(port);
-
- sockStmr.setDelimiter(delim);
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
- @Override public Map.Entry<Integer, String> extract(Tuple msg) {
- return new IgniteBiTuple<>(msg.key, msg.val);
- }
- });
-
- if (converter != null)
- sockStmr.setConverter(converter);
-
- final CountDownLatch latch = new CountDownLatch(CNT);
-
- IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
- @Override public boolean apply(UUID uuid, CacheEvent evt) {
- latch.countDown();
-
- return true;
- }
- };
-
- ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
-
- sockStmr.start();
-
- r.run();
-
- latch.await();
-
- assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
-
- for (int i = 0; i < CNT; i++)
- assertEquals(Integer.toString(i), cache.get(i));
- }
- finally {
- if (sockStmr != null)
- sockStmr.stop();
- }
-
- }
-
- /**
- * Tuple.
- */
- private static class Tuple implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Key. */
- private final int key;
-
- /** Value. */
- private final String val;
-
- /**
- * @param key Key.
- */
- Tuple(int key) {
- this.key = key;
- this.val = Integer.toString(key);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
new file mode 100644
index 0000000..752e43c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link SocketStreamer}.
+ */
+public class SocketStreamerSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Grid count. */
+ private final static int GRID_CNT = 3;
+
+ /** Count. */
+ private static final int CNT = 500;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
+
+ /** Port. */
+ private static int port;
+
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration() throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration();
+
+ CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrids(GRID_CNT);
+ ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ try (ServerSocket sock = new ServerSocket(0)) {
+ port = sock.getLocalPort();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ ignite.cache(null).clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedDefaultConverter() throws Exception {
+ test(null, null, new Runnable() {
+ @Override public void run() {
+ try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg.length >>> 24);
+ os.write(msg.length >>> 16);
+ os.write(msg.length >>> 8);
+ os.write(msg.length);
+
+ os.write(msg);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, null, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(0);
+ os.write(0);
+ os.write(0);
+ os.write(4);
+
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedDefaultConverter() throws Exception {
+ test(null, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg);
+ os.write(DELIM);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+
+ os.write(DELIM);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param converter Converter.
+ * @param r Runnable..
+ */
+ private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+ {
+ SocketStreamer<Tuple, Integer, String> sockStmr = null;
+
+ try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
+
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ sockStmr = new SocketStreamer<>();
+
+ IgniteCache<Integer, String> cache = ignite.cache(null);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setPort(port);
+
+ sockStmr.setDelimiter(delim);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Tuple msg) {
+ return new IgniteBiTuple<>(msg.key, msg.val);
+ }
+ });
+
+ if (converter != null)
+ sockStmr.setConverter(converter);
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+ sockStmr.start();
+
+ r.run();
+
+ latch.await();
+
+ assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+
+ for (int i = 0; i < CNT; i++)
+ assertEquals(Integer.toString(i), cache.get(i));
+ }
+ finally {
+ if (sockStmr != null)
+ sockStmr.stop();
+ }
+
+ }
+
+ /**
+ * Tuple.
+ */
+ private static class Tuple implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Key. */
+ private final int key;
+
+ /** Value. */
+ private final String val;
+
+ /**
+ * @param key Key.
+ */
+ Tuple(int key) {
+ this.key = key;
+ this.val = Integer.toString(key);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
index 87bbfbb..61be976 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
@@ -32,7 +32,7 @@ public class IgniteStreamTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Ignite Stream Test Suite");
- suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class));
+ suite.addTest(new TestSuite(SocketStreamerSelfTest.class));
return suite;
}