You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/05/15 17:55:46 UTC
[01/21] incubator-ignite git commit: Changed default timeout:
SocketWrite, Ack and Network timeouts to 200.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-866 f56342fa2 -> d87a38c31 (forced update)
Changed default timeout:
SocketWrite, Ack and Network timeouts to 200.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/de1c80e4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/de1c80e4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/de1c80e4
Branch: refs/heads/ignite-866
Commit: de1c80e4335f39a8888f6db3121b6f8afa6e5fb1
Parents: a4c9653
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue May 12 15:22:19 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue May 12 15:22:19 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de1c80e4/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index 2d5c541..fd17791 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -203,7 +203,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
public static final int DFLT_ACK_SND_THRESHOLD = 16;
/** Default socket write timeout. */
- public static final long DFLT_SOCK_WRITE_TIMEOUT = GridNioServer.DFLT_SES_WRITE_TIMEOUT;
+ public static final long DFLT_SOCK_WRITE_TIMEOUT = 200;
/** No-op runnable. */
private static final IgniteRunnable NOOP = new IgniteRunnable() {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/de1c80e4/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
index 5d28af2..b7e3cd5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java
@@ -54,11 +54,11 @@ abstract class TcpDiscoverySpiAdapter extends IgniteSpiAdapter implements Discov
/** Default socket operations timeout in milliseconds (value is <tt>200ms</tt>). */
public static final long DFLT_SOCK_TIMEOUT = 200;
- /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>5,000ms</tt>). */
- public static final long DFLT_ACK_TIMEOUT = 5000;
+ /** Default timeout for receiving message acknowledgement in milliseconds (value is <tt>200ms</tt>). */
+ public static final long DFLT_ACK_TIMEOUT = 200;
- /** Default network timeout in milliseconds (value is <tt>5,000ms</tt>). */
- public static final long DFLT_NETWORK_TIMEOUT = 5000;
+ /** Default network timeout in milliseconds (value is <tt>200ms</tt>). */
+ public static final long DFLT_NETWORK_TIMEOUT = 200;
/** Default value for thread priority (value is <tt>10</tt>). */
public static final int DFLT_THREAD_PRI = 10;
[03/21] incubator-ignite git commit: 775 perm to sh files
Posted by sb...@apache.org.
775 perm to sh files
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/a8a0062f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/a8a0062f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/a8a0062f
Branch: refs/heads/ignite-866
Commit: a8a0062f0a13d008204ce015aa2526d609a3776f
Parents: b95d2ae
Author: avinogradov <av...@gridgain.com>
Authored: Tue May 12 20:07:18 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue May 12 20:07:18 2015 +0300
----------------------------------------------------------------------
pom.xml | 2 ++
1 file changed, 2 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/a8a0062f/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 0643227..cb790bd 100644
--- a/pom.xml
+++ b/pom.xml
@@ -300,6 +300,8 @@
<substitution expression="${project.version}" />
</replaceregexp>
+ <chmod dir="${basedir}/target/release-package" perm="755" includes="**/*.sh"/>
+
<zip destfile="${basedir}/target/bin/${ignite.zip.pattern}.zip" encoding="UTF-8">
<zipfileset dir="${basedir}/target/release-package" prefix="${ignite.zip.pattern}" filemode="755">
<include name="**/*.sh" />
[09/21] incubator-ignite git commit: # ignite-669 - streaming design.
Posted by sb...@apache.org.
# ignite-669 - streaming design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/877eb764
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/877eb764
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/877eb764
Branch: refs/heads/ignite-866
Commit: 877eb7649e61ca53765c42b4244a7d46a39d1179
Parents: be64e1d
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:11:54 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:21 2015 +0300
----------------------------------------------------------------------
.../java/org/apache/ignite/stream/adapters/StreamAdapter.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/877eb764/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index f2e0da9..9d4772f 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -69,14 +69,14 @@ public abstract class StreamAdapter<T, K, V> {
/**
* @return Provided tuple extractor.
*/
- public StreamTupleExtractor<T, K, V> getConverter() {
+ public StreamTupleExtractor<T, K, V> getTupleExtractor() {
return extractor;
}
/**
* @param extractor Extractor for key-value tuples from messages.
*/
- public void setExtractor(StreamTupleExtractor<T, K, V> extractor) {
+ public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
this.extractor = extractor;
}
[12/21] incubator-ignite git commit: ignite-430 Words count Socket
streamer examples
Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d87efce0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d87efce0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d87efce0
Branch: refs/heads/ignite-866
Commit: d87efce0af1ad269ea1a4182329f441b6ff32122
Parents: 53995dc
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 18:38:48 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:31 2015 +0300
----------------------------------------------------------------------
.../streaming/socket/SocketStreamerExample.java | 112 +++++++------------
.../socket/ZStringsSocketStreamerExample.java | 76 ++++++-------
.../examples/streaming/socket/package-info.java | 1 +
3 files changed, 75 insertions(+), 114 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
index 73cb970..487572a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -18,36 +18,30 @@
package org.apache.ignite.examples.streaming.socket;
import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
import org.apache.ignite.stream.adapters.*;
import org.apache.ignite.stream.socket.*;
-import javax.cache.processor.*;
import java.io.*;
import java.net.*;
import java.util.*;
/**
- * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
+ * <p>
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link SocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start streaming using {@link SocketStreamerExample}.</li>
+ * <li>Start querying popular numbers using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class SocketStreamerExample {
- /** Random number generator. */
- private static final Random RAND = new Random();
-
- /** Range within which to generate numbers. */
- private static final int RANGE = 1000;
-
/** Port. */
private static final int PORT = 5555;
@@ -63,27 +57,13 @@ public class SocketStreamerExample {
return;
// The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
- try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
- // Allow data updates.
- stmr.allowOverwrite(true);
-
- // Configure data transformation to count instances of the same word.
- stmr.receiver(new StreamTransformer<Integer, Long>() {
- @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
- throws EntryProcessorException {
- Long val = e.getValue();
-
- e.setValue(val == null ? 1L : val + 1);
-
- return null;
- }
- });
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+ try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
InetAddress addr = InetAddress.getLocalHost();
- IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+ // Configure socket streamer
+ IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
sockStmr.setAddr(addr);
@@ -93,9 +73,11 @@ public class SocketStreamerExample {
sockStmr.setStreamer(stmr);
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() {
- @Override public Map.Entry<Integer, Long> extract(Tuple tuple) {
- return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
}
});
@@ -114,45 +96,33 @@ public class SocketStreamerExample {
try (Socket sock = new Socket(addr, port);
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
while (true) {
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos)) {
- Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
-
- out.writeObject(tuple);
-
- byte[] arr = bos.toByteArray();
-
- oos.write(arr.length >>> 24);
- oos.write(arr.length >>> 16);
- oos.write(arr.length >>> 8);
- oos.write(arr.length);
-
- oos.write(arr);
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+ // Write message
+ out.writeObject(word);
+
+ byte[] arr = bos.toByteArray();
+
+ // Write message length
+ oos.write(arr.length >>> 24);
+ oos.write(arr.length >>> 16);
+ oos.write(arr.length >>> 8);
+ oos.write(arr.length);
+
+ oos.write(arr);
+ }
+ }
+ }
+ }
}
}
}
}
-
- /**
- * Tuple.
- */
- private static class Tuple implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0;
-
- /** Key. */
- private final int key;
-
- /** Count. */
- private final long cnt;
-
- /**
- * @param key Key.
- * @param cnt Count.
- */
- public Tuple(int key, long cnt) {
- this.key = key;
- this.cnt = cnt;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
index a535c73..fa5aa28 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -18,40 +18,34 @@
package org.apache.ignite.examples.streaming.socket;
import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
import org.apache.ignite.stream.adapters.*;
import org.apache.ignite.stream.socket.*;
-import javax.cache.processor.*;
import java.io.*;
import java.net.*;
import java.util.*;
/**
- * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
+ * protocol.
* <p>
* Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
* zero-terminated strings.
* <p>
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ * <li>Start querying popular numbers using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class ZStringsSocketStreamerExample {
- /** Random number generator. */
- private static final Random RAND = new Random();
-
- /** Range within which to generate numbers. */
- private static final int RANGE = 1000;
-
/** Port. */
private static final int PORT = 5555;
@@ -70,27 +64,13 @@ public class ZStringsSocketStreamerExample {
return;
// The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
-
- try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
- // Allow data updates.
- stmr.allowOverwrite(true);
-
- // Configure data transformation to count instances of the same word.
- stmr.receiver(new StreamTransformer<Integer, Long>() {
- @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
- throws EntryProcessorException {
- Long val = e.getValue();
-
- e.setValue(val == null ? 1L : val + 1);
-
- return null;
- }
- });
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+ try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
InetAddress addr = InetAddress.getLocalHost();
- IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+ // Configure socket streamer
+ IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
sockStmr.setAddr(addr);
@@ -114,10 +94,11 @@ public class ZStringsSocketStreamerExample {
}
});
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() {
- @Override public Map.Entry<Integer, Long> extract(String input) {
- String[] pair = input.split("=");
- return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
}
});
@@ -137,14 +118,23 @@ public class ZStringsSocketStreamerExample {
OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
while (true) {
- int key = RAND.nextInt(RANGE);
-
- String str = key + "=1";
-
- byte[] arr = str.getBytes("ASCII");
-
- oos.write(arr);
- oos.write(DELIM);
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ byte[] arr = word.getBytes("ASCII");
+
+ // Write message
+ oos.write(arr);
+
+ // Write message delimiter
+ oos.write(DELIM);
+ }
+ }
+ }
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87efce0/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index ae7bdf9..d0a480a 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,6 +16,7 @@
*/
/**
+ * <!-- Package description. -->
* Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
*/
package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file
[18/21] incubator-ignite git commit: # minor
Posted by sb...@apache.org.
# minor
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7fa8abcc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7fa8abcc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7fa8abcc
Branch: refs/heads/ignite-866
Commit: 7fa8abcc8aee3fe038cb28e7666b49de6b7be796
Parents: 04774b5f
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:52:38 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:52:38 2015 +0300
----------------------------------------------------------------------
.../streaming/wordcount/socket/WordsSocketStreamerServer.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7fa8abcc/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 6a8911c..9e68096 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -114,7 +114,7 @@ public class WordsSocketStreamerServer {
sockStmr.start();
}
catch (IgniteException e) {
- System.out.println("Streaming server didn't start due to an error: ");
+ System.err.println("Streaming server didn't start due to an error: ");
e.printStackTrace();
[06/21] incubator-ignite git commit: # IGNITE-894 Revert adding
@InjectRecursively.
Posted by sb...@apache.org.
# IGNITE-894 Revert adding @InjectRecursively.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/56e67e8f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/56e67e8f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/56e67e8f
Branch: refs/heads/ignite-866
Commit: 56e67e8f7ce808c190346d9228987f25b54b4e6f
Parents: 593e3ee
Author: sevdokimov <se...@gridgain.com>
Authored: Thu May 14 16:38:45 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Thu May 14 16:40:00 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridInternalWrapper.java | 30 ++++++++++++++++
.../closure/GridClosureProcessor.java | 25 ++++++++++----
.../internal/processors/igfs/IgfsJobImpl.java | 10 ++++--
.../processors/resource/GridResourceIoc.java | 36 ++++++++++----------
.../resource/GridResourceProcessor.java | 8 ++++-
.../processors/resource/GridResourceUtils.java | 15 ++++++++
.../processors/resource/InjectRecursively.java | 30 ----------------
.../resources/META-INF/classnames.properties | 1 -
8 files changed, 96 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java b/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
new file mode 100644
index 0000000..76563e7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridInternalWrapper.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal;
+
+/**
+ * Internal wrapper interface for custom resource injection logic.
+ */
+public interface GridInternalWrapper<T> {
+ /**
+ * Get user object where resources must be injected.
+ *
+ * @return User object.
+ */
+ public T userObject();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
index 8f5afbf..658557e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/closure/GridClosureProcessor.java
@@ -1584,12 +1584,12 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class C1<T, R> implements ComputeJob, Externalizable {
+ private static class C1<T, R> implements ComputeJob, Externalizable, GridNoImplicitInjection,
+ GridInternalWrapper<IgniteClosure> {
/** */
private static final long serialVersionUID = 0L;
/** */
- @InjectRecursively
protected IgniteClosure<T, R> job;
/** */
@@ -1635,6 +1635,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
+ @Override public IgniteClosure userObject() {
+ return job;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(C1.class, this);
}
@@ -1676,12 +1681,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*
*/
- private static class C2<R> implements ComputeJob, Externalizable {
+ private static class C2<R> implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Callable> {
/** */
private static final long serialVersionUID = 0L;
/** */
- @InjectRecursively
protected Callable<R> c;
/**
@@ -1724,6 +1728,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
+ @Override public Callable userObject() {
+ return c;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(C2.class, this);
}
@@ -1763,12 +1772,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
/**
*/
- private static class C4 implements ComputeJob, Externalizable {
+ private static class C4 implements ComputeJob, Externalizable, GridNoImplicitInjection, GridInternalWrapper<Runnable> {
/** */
private static final long serialVersionUID = 0L;
/** */
- @InjectRecursively
protected Runnable r;
/**
@@ -1808,6 +1816,11 @@ public class GridClosureProcessor extends GridProcessorAdapter {
}
/** {@inheritDoc} */
+ @Override public Runnable userObject() {
+ return r;
+ }
+
+ /** {@inheritDoc} */
@Override public String toString() {
return S.toString(C4.class, this);
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
index 8f2cfd2..fa90e21 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/igfs/IgfsJobImpl.java
@@ -21,7 +21,7 @@ import org.apache.ignite.*;
import org.apache.ignite.compute.*;
import org.apache.ignite.igfs.*;
import org.apache.ignite.igfs.mapreduce.*;
-import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.internal.*;
import org.apache.ignite.resources.*;
import java.io.*;
@@ -29,12 +29,11 @@ import java.io.*;
/**
* IGFS job implementation.
*/
-public class IgfsJobImpl implements ComputeJob {
+public class IgfsJobImpl implements ComputeJob, GridInternalWrapper<IgfsJob> {
/** */
private static final long serialVersionUID = 0L;
/** IGFS job. */
- @InjectRecursively
private IgfsJob job;
/** IGFS name. */
@@ -110,4 +109,9 @@ public class IgfsJobImpl implements ComputeJob {
@Override public void cancel() {
job.cancel();
}
+
+ /** {@inheritDoc} */
+ @Override public IgfsJob userObject() {
+ return job;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
index ce19664..1e85ecd 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceIoc.java
@@ -272,29 +272,29 @@ class GridResourceIoc {
for (Class cls0 = cls; !cls0.equals(Object.class); cls0 = cls0.getSuperclass()) {
for (Field field : cls0.getDeclaredFields()) {
- InjectRecursively injectRecursively = field.getAnnotation(InjectRecursively.class);
+ Annotation[] fieldAnns = field.getAnnotations();
- if (injectRecursively != null
- || (allowImplicitInjection && field.getName().startsWith("this$")
- || field.getName().startsWith("val$"))) {
- field.setAccessible(true);
+ for (Annotation ann : fieldAnns) {
+ T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
- recursiveFieldsList.add(field);
- }
- else {
- for (Annotation ann : field.getAnnotations()) {
- T2<List<GridResourceField>, List<GridResourceMethod>> t2 = annMap.get(ann.annotationType());
+ if (t2 == null) {
+ t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
+ new ArrayList<GridResourceField>(),
+ new ArrayList<GridResourceMethod>());
- if (t2 == null) {
- t2 = new T2<List<GridResourceField>, List<GridResourceMethod>>(
- new ArrayList<GridResourceField>(),
- new ArrayList<GridResourceMethod>());
+ annMap.put(ann.annotationType(), t2);
+ }
- annMap.put(ann.annotationType(), t2);
- }
+ t2.get1().add(new GridResourceField(field, ann));
+ }
- t2.get1().add(new GridResourceField(field, ann));
- }
+ if (allowImplicitInjection
+ && fieldAnns.length == 0
+ && GridResourceUtils.mayRequireResources(field)) {
+ field.setAccessible(true);
+
+ // Account for anonymous inner classes.
+ recursiveFieldsList.add(field);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
index 5b51592..f5ba492 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceProcessor.java
@@ -23,7 +23,6 @@ import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.managers.deployment.*;
import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.*;
import org.apache.ignite.lifecycle.*;
import org.apache.ignite.resources.*;
@@ -281,6 +280,13 @@ public class GridResourceProcessor extends GridProcessorAdapter {
Object obj = unwrapTarget(job);
injectToJob(dep, taskCls, obj, ses, jobCtx);
+
+ if (obj instanceof GridInternalWrapper) {
+ Object usrObj = ((GridInternalWrapper)obj).userObject();
+
+ if (usrObj != null)
+ injectToJob(dep, taskCls, usrObj, ses, jobCtx);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
index 254f171..660d6ba 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/GridResourceUtils.java
@@ -88,4 +88,19 @@ final class GridResourceUtils {
", target=" + target + ", rsrc=" + rsrc + ']', e);
}
}
+
+ /**
+ * Checks if specified field requires recursive inspection to find resource annotations.
+ *
+ * @param f Field.
+ * @return {@code true} if requires, {@code false} if doesn't.
+ */
+ static boolean mayRequireResources(Field f) {
+ assert f != null;
+
+ // Need to inspect anonymous classes, callable and runnable instances.
+ return f.getName().startsWith("this$") || f.getName().startsWith("val$") ||
+ Callable.class.isAssignableFrom(f.getType()) || Runnable.class.isAssignableFrom(f.getType()) ||
+ IgniteClosure.class.isAssignableFrom(f.getType());
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
deleted file mode 100644
index 383ee03..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/resource/InjectRecursively.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.resource;
-
-import java.lang.annotation.*;
-
-/**
- * Indicates that resource injection should be performed for field value too.
- */
-@Documented
-@Retention(RetentionPolicy.RUNTIME)
-@Target(ElementType.FIELD)
-public @interface InjectRecursively {
- // No-op.
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/56e67e8f/modules/core/src/main/resources/META-INF/classnames.properties
----------------------------------------------------------------------
diff --git a/modules/core/src/main/resources/META-INF/classnames.properties b/modules/core/src/main/resources/META-INF/classnames.properties
index f130840..b3eed46 100644
--- a/modules/core/src/main/resources/META-INF/classnames.properties
+++ b/modules/core/src/main/resources/META-INF/classnames.properties
@@ -1140,7 +1140,6 @@ org.apache.ignite.internal.util.lang.GridAbsClosure
org.apache.ignite.internal.util.lang.GridAbsClosureX
org.apache.ignite.internal.util.lang.GridCloseableIterator
org.apache.ignite.internal.util.lang.GridClosureException
-org.apache.ignite.internal.util.lang.GridComputeJobWrapper
org.apache.ignite.internal.util.lang.GridFunc$1
org.apache.ignite.internal.util.lang.GridFunc$10
org.apache.ignite.internal.util.lang.GridFunc$100
[05/21] incubator-ignite git commit: #ignite-373: Cache is not empty
after removeAll.
Posted by sb...@apache.org.
#ignite-373: Cache is not empty after removeAll.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/593e3eee
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/593e3eee
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/593e3eee
Branch: refs/heads/ignite-866
Commit: 593e3eeeb0d4965b1c1a83d4f68a9d18e6615632
Parents: 7c91389
Author: ivasilinets <iv...@gridgain.com>
Authored: Thu May 14 15:35:27 2015 +0300
Committer: ivasilinets <iv...@gridgain.com>
Committed: Thu May 14 15:35:27 2015 +0300
----------------------------------------------------------------------
.../processors/cache/GridCacheAdapter.java | 119 +++++------
.../GridDistributedCacheAdapter.java | 210 ++++++++++++-------
.../cache/CacheRemoveAllSelfTest.java | 81 +++++++
.../near/NoneRebalanceModeSelfTest.java | 67 ++++++
.../testsuites/IgniteCacheTestSuite2.java | 1 +
.../testsuites/IgniteCacheTestSuite4.java | 2 +
6 files changed, 338 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 3826bfa..4106cb0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -1133,7 +1133,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null).get();
+ ctx.kernalContext().task().execute(
+ new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null).get();
}
}
@@ -1152,7 +1153,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
if (!nodes.isEmpty()) {
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.kernalContext().task().execute(new ClearTask(ctx, keys), null);
+ return ctx.kernalContext().task().execute(
+ new ClearTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys), null);
}
else
return new GridFinishedFuture<>();
@@ -3571,7 +3573,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- return ctx.kernalContext().task().execute(new SizeTask(ctx, peekModes), null);
+ return ctx.kernalContext().task().execute(
+ new SizeTask(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), null);
}
/** {@inheritDoc} */
@@ -4827,13 +4830,6 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/**
- * Empty constructor for serialization.
- */
- public GlobalClearAllJob() {
- // No-op.
- }
-
- /**
* @param cacheName Cache name.
* @param topVer Affinity topology version.
*/
@@ -4859,14 +4855,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/** Keys to remove. */
- private Set<? extends K> keys;
-
- /**
- * Empty constructor for serialization.
- */
- public GlobalClearKeySetJob() {
- // No-op.
- }
+ private final Set<? extends K> keys;
/**
* @param cacheName Cache name.
@@ -4897,14 +4886,7 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
private static final long serialVersionUID = 0L;
/** Peek modes. */
- private CachePeekMode[] peekModes;
-
- /**
- * Required by {@link Externalizable}.
- */
- public SizeJob() {
- // No-op.
- }
+ private final CachePeekMode[] peekModes;
/**
* @param cacheName Cache name.
@@ -5514,17 +5496,10 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
protected Ignite ignite;
/** Affinity topology version. */
- protected AffinityTopologyVersion topVer;
+ protected final AffinityTopologyVersion topVer;
/** Cache name. */
- protected String cacheName;
-
- /**
- * Empty constructor for serialization.
- */
- public TopologyVersionAwareJob() {
- // No-op.
- }
+ protected final String cacheName;
/**
* @param cacheName Cache name.
@@ -5583,24 +5558,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Cache context. */
- private GridCacheContext ctx;
+ /** Cache name. */
+ private final String cacheName;
- /** Peek modes. */
- private CachePeekMode[] peekModes;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
- /**
- * Empty constructor for serialization.
- */
- public SizeTask() {
- // No-op.
- }
+ /** Peek modes. */
+ private final CachePeekMode[] peekModes;
/**
- * @param ctx Cache context.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param peekModes Cache peek modes.
*/
- public SizeTask(GridCacheContext ctx, CachePeekMode[] peekModes) {
- this.ctx = ctx;
+ public SizeTask(String cacheName, AffinityTopologyVersion topVer, CachePeekMode[] peekModes) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
this.peekModes = peekModes;
}
@@ -5610,13 +5584,22 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<ComputeJob, ClusterNode> jobs = new HashMap();
for (ClusterNode node : subgrid)
- jobs.put(new SizeJob(ctx.name(), ctx.affinity().affinityTopologyVersion(), peekModes), node);
+ jobs.put(new SizeJob(cacheName, topVer, peekModes), node);
return jobs;
}
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
return ComputeJobResultPolicy.WAIT;
}
@@ -5640,25 +5623,23 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** */
private static final long serialVersionUID = 0L;
- /** Cache context. */
- private GridCacheContext ctx;
+ /** Cache name. */
+ private final String cacheName;
- /** Keys to clear. */
- private Set<? extends K> keys;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
- /**
- * Empty constructor for serialization.
- */
- public ClearTask() {
- // No-op.
- }
+ /** Keys to clear. */
+ private final Set<? extends K> keys;
/**
- * @param ctx Cache context.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
* @param keys Keys to clear.
*/
- public ClearTask(GridCacheContext ctx, Set<? extends K> keys) {
- this.ctx = ctx;
+ public ClearTask(String cacheName, AffinityTopologyVersion topVer, Set<? extends K> keys) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
this.keys = keys;
}
@@ -5668,9 +5649,8 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
Map<ComputeJob, ClusterNode> jobs = new HashMap();
for (ClusterNode node : subgrid) {
- jobs.put(keys == null ?
- new GlobalClearAllJob(ctx.name(), ctx.affinity().affinityTopologyVersion()) :
- new GlobalClearKeySetJob<K>(ctx.name(), ctx.affinity().affinityTopologyVersion(), keys),
+ jobs.put(keys == null ? new GlobalClearAllJob(cacheName, topVer) :
+ new GlobalClearKeySetJob<K>(cacheName, topVer, keys),
node);
}
@@ -5679,6 +5659,15 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
return ComputeJobResultPolicy.WAIT;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
index 3a685cc..c5ef22f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/distributed/GridDistributedCacheAdapter.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache.distributed;
import org.apache.ignite.*;
import org.apache.ignite.cluster.*;
+import org.apache.ignite.compute.*;
import org.apache.ignite.internal.*;
import org.apache.ignite.internal.cluster.*;
import org.apache.ignite.internal.processors.affinity.*;
@@ -30,17 +31,17 @@ import org.apache.ignite.internal.processors.cache.version.*;
import org.apache.ignite.internal.processors.datastreamer.*;
import org.apache.ignite.internal.processors.task.*;
import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
import org.apache.ignite.internal.util.typedef.internal.*;
import org.apache.ignite.lang.*;
-import org.apache.ignite.resources.*;
import org.apache.ignite.transactions.*;
import org.jetbrains.annotations.*;
import java.io.*;
import java.util.*;
-import java.util.concurrent.*;
-import static org.apache.ignite.internal.GridClosureCallMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+import static org.apache.ignite.internal.processors.task.GridTaskThreadContextKey.*;
/**
* Distributed cache implementation.
@@ -142,21 +143,28 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
try {
AffinityTopologyVersion topVer;
+ boolean retry;
+
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
do {
+ retry = false;
+
topVer = ctx.affinity().affinityTopologyVersion();
// Send job to all data nodes.
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes,
- true).get();
+ retry = !ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null).get();
}
}
- while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) > 0);
+ while (ctx.affinity().affinityTopologyVersion().compareTo(topVer) != 0 || retry);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -170,7 +178,11 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
AffinityTopologyVersion topVer = ctx.affinity().affinityTopologyVersion();
- removeAllAsync(opFut, topVer);
+ CacheOperationContext opCtx = ctx.operationContextPerCall();
+
+ boolean skipStore = opCtx != null && opCtx.skipStore();
+
+ removeAllAsync(opFut, topVer, skipStore);
return opFut;
}
@@ -178,27 +190,29 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
/**
* @param opFut Future.
* @param topVer Topology version.
+ * @param skipStore Skip store flag.
*/
- private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer) {
+ private void removeAllAsync(final GridFutureAdapter<Void> opFut, final AffinityTopologyVersion topVer,
+ final boolean skipStore) {
Collection<ClusterNode> nodes = ctx.grid().cluster().forDataNodes(name()).nodes();
if (!nodes.isEmpty()) {
- CacheOperationContext opCtx = ctx.operationContextPerCall();
+ ctx.kernalContext().task().setThreadContext(TC_SUBGRID, nodes);
- IgniteInternalFuture<?> rmvFut = ctx.closures().callAsyncNoFailover(BROADCAST,
- new GlobalRemoveAllCallable<>(name(), topVer, opCtx != null && opCtx.skipStore()), nodes, true);
+ IgniteInternalFuture<Boolean> rmvAll = ctx.kernalContext().task().execute(
+ new RemoveAllTask(ctx.name(), topVer, skipStore), null);
- rmvFut.listen(new IgniteInClosure<IgniteInternalFuture<?>>() {
- @Override public void apply(IgniteInternalFuture<?> fut) {
+ rmvAll.listen(new IgniteInClosure<IgniteInternalFuture<Boolean>>() {
+ @Override public void apply(IgniteInternalFuture<Boolean> fut) {
try {
- fut.get();
+ boolean retry = !fut.get();
AffinityTopologyVersion topVer0 = ctx.affinity().affinityTopologyVersion();
- if (topVer0.equals(topVer))
+ if (topVer0.equals(topVer) && !retry)
opFut.onDone();
else
- removeAllAsync(opFut, topVer0);
+ removeAllAsync(opFut, topVer0, skipStore);
}
catch (ClusterGroupEmptyCheckedException ignore) {
if (log.isDebugEnabled())
@@ -227,97 +241,150 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
/**
- * Internal callable which performs remove all primary key mappings
- * operation on a cache with the given name.
+ * Remove task.
*/
@GridInternal
- private static class GlobalRemoveAllCallable<K,V> implements Callable<Object>, Externalizable {
+ private static class RemoveAllTask extends ComputeTaskAdapter<Object, Boolean> {
/** */
private static final long serialVersionUID = 0L;
/** Cache name. */
- private String cacheName;
+ private final String cacheName;
- /** Topology version. */
- private AffinityTopologyVersion topVer;
+ /** Affinity topology version. */
+ private final AffinityTopologyVersion topVer;
/** Skip store flag. */
- private boolean skipStore;
-
- /** Injected grid instance. */
- @IgniteInstanceResource
- private Ignite ignite;
+ private final boolean skipStore;
/**
- * Empty constructor for serialization.
+ * @param cacheName Cache name.
+ * @param topVer Affinity topology version.
+ * @param skipStore Skip store flag.
*/
- public GlobalRemoveAllCallable() {
- // No-op.
+ public RemoveAllTask(String cacheName, AffinityTopologyVersion topVer, boolean skipStore) {
+ this.cacheName = cacheName;
+ this.topVer = topVer;
+ this.skipStore = skipStore;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode> subgrid,
+ @Nullable Object arg) throws IgniteException {
+ Map<ComputeJob, ClusterNode> jobs = new HashMap();
+
+ for (ClusterNode node : subgrid)
+ jobs.put(new GlobalRemoveAllJob(cacheName, topVer, skipStore), node);
+
+ return jobs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ComputeJobResultPolicy result(ComputeJobResult res, List<ComputeJobResult> rcvd) {
+ IgniteException e = res.getException();
+
+ if (e != null) {
+ if (e instanceof ClusterTopologyException)
+ return ComputeJobResultPolicy.WAIT;
+
+ throw new IgniteException("Remote job threw exception.", e);
+ }
+
+ return ComputeJobResultPolicy.WAIT;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean reduce(List<ComputeJobResult> results) throws IgniteException {
+ for (ComputeJobResult locRes : results) {
+ if (locRes != null && (locRes.getException() != null || !locRes.<Boolean>getData()))
+ return false;
+ }
+
+ return true;
}
+ }
+ /**
+ * Internal job which performs remove all primary key mappings
+ * operation on a cache with the given name.
+ */
+ @GridInternal
+ private static class GlobalRemoveAllJob<K,V> extends TopologyVersionAwareJob {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** Skip store flag. */
+ private final boolean skipStore;
/**
* @param cacheName Cache name.
* @param topVer Topology version.
* @param skipStore Skip store flag.
*/
- private GlobalRemoveAllCallable(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
- this.cacheName = cacheName;
- this.topVer = topVer;
+ private GlobalRemoveAllJob(String cacheName, @NotNull AffinityTopologyVersion topVer, boolean skipStore) {
+ super(cacheName, topVer);
+
this.skipStore = skipStore;
}
- /**
- * {@inheritDoc}
- */
- @Override public Object call() throws Exception {
- GridCacheAdapter<K, V> cacheAdapter = ((IgniteKernal)ignite).context().cache().internalCache(cacheName);
+ /** {@inheritDoc} */
+ @Nullable @Override public Object localExecute(@Nullable IgniteInternalCache cache0) {
+ GridCacheAdapter cache = ((IgniteKernal) ignite).context().cache().internalCache(cacheName);
- final GridCacheContext<K, V> ctx = cacheAdapter.context();
+ if (cache == null)
+ return true;
- ctx.affinity().affinityReadyFuture(topVer).get();
+ final GridCacheContext<K, V> ctx = cache.context();
ctx.gate().enter();
try {
if (!ctx.affinity().affinityTopologyVersion().equals(topVer))
- return null; // Ignore this remove request because remove request will be sent again.
+ return false; // Ignore this remove request because remove request will be sent again.
GridDhtCacheAdapter<K, V> dht;
GridNearCacheAdapter<K, V> near = null;
- if (cacheAdapter instanceof GridNearCacheAdapter) {
- near = ((GridNearCacheAdapter<K, V>)cacheAdapter);
+ if (cache instanceof GridNearCacheAdapter) {
+ near = ((GridNearCacheAdapter<K, V>) cache);
dht = near.dht();
}
else
- dht = (GridDhtCacheAdapter<K, V>)cacheAdapter;
+ dht = (GridDhtCacheAdapter<K, V>) cache;
try (DataStreamerImpl<KeyCacheObject, Object> dataLdr =
- (DataStreamerImpl)ignite.dataStreamer(cacheName)) {
- ((DataStreamerImpl)dataLdr).maxRemapCount(0);
+ (DataStreamerImpl) ignite.dataStreamer(cacheName)) {
+ ((DataStreamerImpl) dataLdr).maxRemapCount(0);
dataLdr.skipStore(skipStore);
dataLdr.receiver(DataStreamerCacheUpdaters.<KeyCacheObject, Object>batched());
- for (GridDhtLocalPartition locPart : dht.topology().currentLocalPartitions()) {
- if (!locPart.isEmpty() && locPart.primary(topVer)) {
- for (GridDhtCacheEntry o : locPart.entries()) {
- if (!o.obsoleteOrDeleted())
- dataLdr.removeDataInternal(o.key());
- }
- }
- }
+ for (int part : ctx.affinity().primaryPartitions(ctx.localNodeId(), topVer)) {
+ GridDhtLocalPartition locPart = dht.topology().localPartition(part, topVer, false);
- Iterator<KeyCacheObject> it = dht.context().swap().offHeapKeyIterator(true, false, topVer);
+ if (locPart == null || (ctx.rebalanceEnabled() && locPart.state() != OWNING) || !locPart.reserve())
+ return false;
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ try {
+ if (!locPart.isEmpty()) {
+ for (GridDhtCacheEntry o : locPart.entries()) {
+ if (!o.obsoleteOrDeleted())
+ dataLdr.removeDataInternal(o.key());
+ }
+ }
- it = dht.context().swap().swapKeyIterator(true, false, topVer);
+ GridCloseableIterator<Map.Entry<byte[], GridCacheSwapEntry>> iter =
+ dht.context().swap().iterator(part);
- while (it.hasNext())
- dataLdr.removeDataInternal(it.next());
+ if (iter != null) {
+ for (Map.Entry<byte[], GridCacheSwapEntry> e : iter)
+ dataLdr.removeDataInternal(ctx.toCacheKeyObject(e.getKey()));
+ }
+ }
+ finally {
+ locPart.release();
+ }
+ }
}
if (near != null) {
@@ -329,25 +396,14 @@ public abstract class GridDistributedCacheAdapter<K, V> extends GridCacheAdapter
}
}
}
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
finally {
ctx.gate().leave();
}
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- U.writeString(out, cacheName);
- out.writeObject(topVer);
- out.writeBoolean(skipStore);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cacheName = U.readString(in);
- topVer = (AffinityTopologyVersion)in.readObject();
- skipStore = in.readBoolean();
+ return true;
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
new file mode 100644
index 0000000..f5de96f
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/CacheRemoveAllSelfTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.testframework.*;
+
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+/**
+ * Test remove all method.
+ */
+public class CacheRemoveAllSelfTest extends GridCacheAbstractSelfTest {
+ /** {@inheritDoc} */
+ @Override protected long getTestTimeout() {
+ return 60000;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int gridCount() {
+ return 4;
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAll() throws Exception {
+ IgniteCache<Integer, String> cache = grid(0).cache(null);
+
+ for (int i = 0; i < 10_000; ++i)
+ cache.put(i, "val");
+
+ final AtomicInteger igniteId = new AtomicInteger(gridCount());
+
+ IgniteInternalFuture fut = GridTestUtils.runMultiThreadedAsync(new Callable<Object>() {
+ @Override public Object call() throws Exception {
+ for (int i = 0; i < 2; ++i)
+ startGrid(igniteId.getAndIncrement());
+
+ return true;
+ }
+ }, 3, "start-node-thread");
+
+ cache.removeAll();
+
+ fut.get();
+
+ U.sleep(5000);
+
+ for (int i = 0; i < igniteId.get(); ++i) {
+ IgniteCache locCache = grid(i).cache(null);
+
+ assertEquals("Local size: " + locCache.localSize() + "\n" +
+ "On heap: " + locCache.localSize(CachePeekMode.ONHEAP) + "\n" +
+ "Off heap: " + locCache.localSize(CachePeekMode.OFFHEAP) + "\n" +
+ "Swap: " + locCache.localSize(CachePeekMode.SWAP) + "\n" +
+ "Primary: " + locCache.localSize(CachePeekMode.PRIMARY) + "\n" +
+ "Backup: " + locCache.localSize(CachePeekMode.BACKUP),
+ 0, locCache.localSize());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
new file mode 100644
index 0000000..d61ddcc
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/distributed/near/NoneRebalanceModeSelfTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.cache.distributed.near;
+
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.cache.distributed.dht.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import static org.apache.ignite.cache.CacheRebalanceMode.*;
+import static org.apache.ignite.internal.processors.cache.distributed.dht.GridDhtPartitionState.*;
+
+/**
+ * Test none rebalance mode.
+ */
+public class NoneRebalanceModeSelfTest extends GridCommonAbstractTest {
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ConstantConditions"})
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration c = super.getConfiguration(gridName);
+
+ CacheConfiguration cc = defaultCacheConfiguration();
+
+ cc.setRebalanceMode(NONE);
+
+ c.setCacheConfiguration(cc);
+
+ return c;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ startGrid(0);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testRemoveAll() throws Exception {
+ GridNearTransactionalCache cache = (GridNearTransactionalCache)((IgniteKernal)grid(0)).internalCache(null);
+
+ for (GridDhtLocalPartition part : cache.dht().topology().localPartitions())
+ assertEquals(MOVING, part.state());
+
+ grid(0).cache(null).removeAll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
index dc3a2c0..5738778 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite2.java
@@ -58,6 +58,7 @@ public class IgniteCacheTestSuite2 extends TestSuite {
suite.addTestSuite(GridCachePartitionedGetSelfTest.class);
suite.addTest(new TestSuite(GridCachePartitionedBasicApiTest.class));
suite.addTest(new TestSuite(GridCacheNearMultiGetSelfTest.class));
+ suite.addTest(new TestSuite(NoneRebalanceModeSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearJobExecutionSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearOneNodeSelfTest.class));
suite.addTest(new TestSuite(GridCacheNearMultiNodeSelfTest.class));
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/593e3eee/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
index 8eb0688..aaf7e5b 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteCacheTestSuite4.java
@@ -126,6 +126,8 @@ public class IgniteCacheTestSuite4 extends TestSuite {
suite.addTestSuite(CacheNoValueClassOnServerNodeTest.class);
+ suite.addTestSuite(CacheRemoveAllSelfTest.class);
+
suite.addTestSuite(CacheOffheapMapEntrySelfTest.class);
return suite;
[14/21] incubator-ignite git commit: ignite-430 review
Posted by sb...@apache.org.
ignite-430 review
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/fe78d42c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/fe78d42c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/fe78d42c
Branch: refs/heads/ignite-866
Commit: fe78d42cfe77626b7250875e0c3f608b27dc9367
Parents: 7ee8517
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Thu May 14 16:36:06 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:37 2015 +0300
----------------------------------------------------------------------
.../socket/WordsSocketStreamerClient.java | 86 --------------
.../socket/ZWordsSocketStreamerClient.java | 81 --------------
.../socket/ZWordsSocketStreamerServer.java | 111 -------------------
.../examples/streaming/socket/package-info.java | 21 ----
.../streaming/wordcount/QueryWords.java | 6 +
.../streaming/wordcount/StreamWords.java | 6 +
.../socket/WordsSocketStreamerClient.java | 82 ++++++++++++++
.../socket/WordsSocketStreamerServer.java | 111 +++++++++++++++++++
.../wordcount/socket/package-info.java | 21 ++++
9 files changed, 226 insertions(+), 299 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
deleted file mode 100644
index c5ec079..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Sends words to socket server based on {@link SocketStreamer} using message size based protocol.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
- * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
- * <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class WordsSocketStreamerClient {
- /** Port. */
- private static final int PORT = 5555;
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws IOException {
- InetAddress addr = InetAddress.getLocalHost();
-
- try (Socket sock = new Socket(addr, PORT);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
- System.out.println("Words streaming started.");
-
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
- // Write message
- out.writeObject(word);
-
- byte[] arr = bos.toByteArray();
-
- // Write message length
- oos.write(arr.length >>> 24);
- oos.write(arr.length >>> 16);
- oos.write(arr.length >>> 8);
- oos.write(arr.length);
-
- oos.write(arr);
- }
- }
- }
- }
- }
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
deleted file mode 100644
index c17ccdc..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-
-/**
- * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
- * Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
- * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
- * <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZWordsSocketStreamerClient {
- /** Port. */
- private static final int PORT = 5555;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0};
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws IOException {
- InetAddress addr = InetAddress.getLocalHost();
-
- try (Socket sock = new Socket(addr, PORT);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
- System.out.println("Words streaming started.");
-
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- byte[] arr = word.getBytes("ASCII");
-
- // Write message
- oos.write(arr);
-
- // Write message delimiter
- oos.write(DELIM);
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
deleted file mode 100644
index a0ef9da..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
- * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
- * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
- * <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZWordsSocketStreamerServer {
- /** Port. */
- private static final int PORT = 5555;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0};
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
-
- if (!ExamplesUtils.hasServerNodes(ignite)) {
- ignite.close();
-
- return;
- }
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
-
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setDelimiter(DELIM);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- // Converter from zero-terminated string to Java strings.
- sockStmr.setConverter(new SocketMessageConverter<String>() {
- @Override public String convert(byte[] msg) {
- try {
- return new String(msg, "ASCII");
- }
- catch (UnsupportedEncodingException e) {
- throw new IgniteException(e);
- }
- }
- });
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
deleted file mode 100644
index c516ab4..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
- */
-package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index 149aa79..faf8b51 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -36,6 +36,12 @@ import java.util.*;
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class QueryWords {
+ /**
+ * Schedules words query execution.
+ *
+ * @param args Command line arguments (none required).
+ * @throws Exception If failed.
+ */
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index cc3c0cb..26be178 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -35,6 +35,12 @@ import java.io.*;
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class StreamWords {
+ /**
+ * Starts words streaming.
+ *
+ * @param args Command line arguments (none required).
+ * @throws Exception If failed.
+ */
public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
new file mode 100644
index 0000000..ea3beaa
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerClient {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress addr = InetAddress.getLocalHost();
+
+ try (
+ Socket sock = new Socket(addr, PORT);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())
+ ) {
+ System.out.println("Words streaming started.");
+
+ while (true) {
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ byte[] arr = word.getBytes("ASCII");
+
+ // Write message
+ oos.write(arr);
+
+ // Write message delimiter
+ oos.write(DELIM);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
new file mode 100644
index 0000000..259c925
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.wordcount.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerServer {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+ if (!ExamplesUtils.hasServerNodes(ignite)) {
+ ignite.close();
+
+ return;
+ }
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+ IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ // Configure socket streamer
+ SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setDelimiter(DELIM);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ // Converter from zero-terminated string to Java strings.
+ sockStmr.setConverter(new SocketMessageConverter<String>() {
+ @Override public String convert(byte[] msg) {
+ try {
+ return new String(msg, "ASCII");
+ }
+ catch (UnsupportedEncodingException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
+ }
+ });
+
+ sockStmr.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/fe78d42c/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
new file mode 100644
index 0000000..048299f
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
+ */
+package org.apache.ignite.examples.streaming.wordcount.socket;
[19/21] incubator-ignite git commit: # removed cleaning of IpFinder
by coordinator on topology changes.
Posted by sb...@apache.org.
# removed cleaning of IpFinder by coordinator on topology changes.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/ee408190
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/ee408190
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/ee408190
Branch: refs/heads/ignite-866
Commit: ee40819088fe3724ad7e7cad66d501d7db237102
Parents: 7fa8abc
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:53:28 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:53:28 2015 +0300
----------------------------------------------------------------------
.../spi/discovery/tcp/TcpDiscoverySpi.java | 26 --------------------
1 file changed, 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/ee408190/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index 1dea37a..ed0e9dd 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -3952,18 +3952,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
- if (!msg.client() && ipFinder.isShared()) {
- try {
- ipFinder.unregisterAddresses(leftNode.socketAddresses());
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to unregister left node address: " + leftNode);
-
- onException("Failed to unregister left node address: " + leftNode, e);
- }
- }
-
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
@@ -4131,20 +4119,6 @@ public class TcpDiscoverySpi extends TcpDiscoverySpiAdapter implements TcpDiscov
long topVer;
if (locNodeCoord) {
- if (!node.isClient() && ipFinder.isShared()) {
- try {
- ipFinder.unregisterAddresses(node.socketAddresses());
- }
- catch (IgniteSpiException e) {
- if (log.isDebugEnabled())
- log.debug("Failed to unregister failed node address [node=" + node +
- ", err=" + e.getMessage() + ']');
-
- onException("Failed to unregister failed node address [node=" + node +
- ", err=" + e.getMessage() + ']', e);
- }
- }
-
topVer = ring.incrementTopologyVersion();
msg.topologyVersion(topVer);
[10/21] incubator-ignite git commit: # ignite-669 - streaming design.
Posted by sb...@apache.org.
# ignite-669 - streaming design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/b0fbfa05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/b0fbfa05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/b0fbfa05
Branch: refs/heads/ignite-866
Commit: b0fbfa05cb1cb712fe217da7166d945c7c2ec597
Parents: 877eb76
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:13:04 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:24 2015 +0300
----------------------------------------------------------------------
.../main/java/org/apache/ignite/stream/adapters/StreamAdapter.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/b0fbfa05/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index 9d4772f..c729362 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -37,7 +37,7 @@ public abstract class StreamAdapter<T, K, V> {
/**
* Empty constructor.
*/
- public StreamAdapter() {
+ protected StreamAdapter() {
// No-op.
}
[21/21] incubator-ignite git commit: ignite-866 NPE during clean up
Posted by sb...@apache.org.
ignite-866 NPE during clean up
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/d87a38c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/d87a38c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/d87a38c3
Branch: refs/heads/ignite-866
Commit: d87a38c3151ecfe0ee512516764f54a115d48abb
Parents: 94e202a
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 14:40:38 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 18:48:14 2015 +0300
----------------------------------------------------------------------
.../processors/cache/CacheMetricsImpl.java | 62 +++++++++++++++-----
.../processors/cache/GridCacheAdapter.java | 12 +++-
.../processors/cache/GridCacheContext.java | 8 +--
3 files changed, 59 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
index 560de97..af19077 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/CacheMetricsImpl.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.processors.cache;
import org.apache.ignite.*;
import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
import org.apache.ignite.internal.processors.cache.store.*;
import org.apache.ignite.internal.util.tostring.*;
import org.apache.ignite.internal.util.typedef.internal.*;
@@ -118,7 +119,9 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public long getOverflowSize() {
try {
- return cctx.cache().overflowSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.overflowSize() : -1;
}
catch (IgniteCheckedException ignored) {
return -1;
@@ -127,34 +130,47 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public long getOffHeapEntriesCount() {
- return cctx.cache().offHeapEntriesCount();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
@Override public long getOffHeapAllocatedSize() {
- return cctx.cache().offHeapAllocatedSize();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
@Override public int getSize() {
- return cctx.cache().size();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache != null ? cache.size() : 0;
}
/** {@inheritDoc} */
@Override public int getKeySize() {
- return cctx.cache().size();
+ return getSize();
}
/** {@inheritDoc} */
@Override public boolean isEmpty() {
- return cctx.cache().isEmpty();
+ GridCacheAdapter<?, ?> cache = cctx.cache();
+
+ return cache == null || cache.isEmpty();
}
/** {@inheritDoc} */
@Override public int getDhtEvictQueueCurrentSize() {
- return cctx.isNear() ?
- dhtCtx != null ? dhtCtx.evicts().evictQueueSize() : -1
- : cctx.evicts().evictQueueSize();
+ GridCacheContext<?, ?> ctx = cctx.isNear() ? dhtCtx : cctx;
+
+ if (ctx == null)
+ return -1;
+
+ GridCacheEvictionManager evictMgr = ctx.evicts();
+
+ return evictMgr != null ? evictMgr.evictQueueSize() : -1;
}
/** {@inheritDoc} */
@@ -548,37 +564,51 @@ public class CacheMetricsImpl implements CacheMetrics {
/** {@inheritDoc} */
@Override public String getKeyType() {
- return cctx.config().getKeyType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getKeyType().getName() : null;
}
/** {@inheritDoc} */
@Override public String getValueType() {
- return cctx.config().getValueType().getName();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null ? ccfg.getValueType().getName() : null;
}
/** {@inheritDoc} */
@Override public boolean isReadThrough() {
- return cctx.config().isReadThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isReadThrough();
}
/** {@inheritDoc} */
@Override public boolean isWriteThrough() {
- return cctx.config().isWriteThrough();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isWriteThrough();
}
/** {@inheritDoc} */
@Override public boolean isStoreByValue() {
- return cctx.config().isStoreByValue();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStoreByValue();
}
/** {@inheritDoc} */
@Override public boolean isStatisticsEnabled() {
- return cctx.config().isStatisticsEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isStatisticsEnabled();
}
/** {@inheritDoc} */
@Override public boolean isManagementEnabled() {
- return cctx.config().isManagementEnabled();
+ CacheConfiguration ccfg = cctx.config();
+
+ return ccfg != null && ccfg.isManagementEnabled();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
index 4106cb0..5573e7b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheAdapter.java
@@ -3249,7 +3249,9 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long overflowSize() throws IgniteCheckedException {
- return ctx.swap().swapSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.swapSize() : -1;
}
/**
@@ -3802,12 +3804,16 @@ public abstract class GridCacheAdapter<K, V> implements IgniteInternalCache<K, V
/** {@inheritDoc} */
@Override public long offHeapEntriesCount() {
- return ctx.swap().offHeapEntriesCount();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapEntriesCount() : -1;
}
/** {@inheritDoc} */
@Override public long offHeapAllocatedSize() {
- return ctx.swap().offHeapAllocatedSize();
+ GridCacheSwapManager swapMgr = ctx.swap();
+
+ return swapMgr != null ? swapMgr.offHeapAllocatedSize() : -1;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d87a38c3/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
index 2eeaed6..10b8235 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/GridCacheContext.java
@@ -98,7 +98,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private IgniteLogger log;
/** Cache configuration. */
- private CacheConfiguration cacheCfg;
+ private volatile CacheConfiguration cacheCfg;
/** Unsafe memory object for direct memory allocation. */
private GridUnsafeMemory unsafeMemory;
@@ -116,10 +116,10 @@ public class GridCacheContext<K, V> implements Externalizable {
private CacheContinuousQueryManager contQryMgr;
/** Swap manager. */
- private GridCacheSwapManager swapMgr;
+ private volatile GridCacheSwapManager swapMgr;
/** Evictions manager. */
- private GridCacheEvictionManager evictMgr;
+ private volatile GridCacheEvictionManager evictMgr;
/** Data structures manager. */
private CacheDataStructuresManager dataStructuresMgr;
@@ -149,7 +149,7 @@ public class GridCacheContext<K, V> implements Externalizable {
private GridCacheGateway<K, V> gate;
/** Grid cache. */
- private GridCacheAdapter<K, V> cache;
+ private volatile GridCacheAdapter<K, V> cache;
/** Cached local rich node. */
private ClusterNode locNode;
[02/21] incubator-ignite git commit: Merge branch 'ignite-timeout'
into ignite-sprint-4
Posted by sb...@apache.org.
Merge branch 'ignite-timeout' into ignite-sprint-4
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/478cc7bf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/478cc7bf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/478cc7bf
Branch: refs/heads/ignite-866
Commit: 478cc7bfe8a507982ac0ad9fd7d3370d19a936fa
Parents: b95d2ae de1c80e
Author: nikolay_tikhonov <nt...@gridgain.com>
Authored: Tue May 12 17:46:50 2015 +0300
Committer: nikolay_tikhonov <nt...@gridgain.com>
Committed: Tue May 12 17:46:50 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[11/21] incubator-ignite git commit: ignite-430 Implement
IgniteSocketStreamer to stream data from TCP socket.
Posted by sb...@apache.org.
ignite-430 Implement IgniteSocketStreamer to stream data from TCP socket.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/53995dcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/53995dcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/53995dcb
Branch: refs/heads/ignite-866
Commit: 53995dcb3470de07df73d8dfd284da0dbb8df8dd
Parents: b0fbfa0
Author: agura <ag...@gridgain.com>
Authored: Mon Apr 13 18:28:40 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:27 2015 +0300
----------------------------------------------------------------------
.../streaming/socket/SocketStreamerExample.java | 158 ++++++++++
.../socket/ZStringsSocketStreamerExample.java | 151 +++++++++
.../examples/streaming/socket/package-info.java | 21 ++
.../internal/util/nio/GridBufferedParser.java | 4 -
.../internal/util/nio/GridDelimitedParser.java | 91 ++++++
.../util/nio/GridNioDelimitedBuffer.java | 106 +++++++
.../ignite/stream/adapters/StreamAdapter.java | 17 +
.../stream/socket/IgniteSocketStreamer.java | 217 +++++++++++++
.../stream/socket/SocketMessageConverter.java | 31 ++
.../ignite/stream/socket/package-info.java | 21 ++
.../util/nio/GridNioDelimitedBufferTest.java | 112 +++++++
.../socket/IgniteSocketStreamerSelfTest.java | 315 +++++++++++++++++++
.../ignite/stream/socket/package-info.java | 21 ++
.../testsuites/IgniteStreamTestSuite.java | 39 +++
.../testsuites/IgniteUtilSelfTestSuite.java | 1 +
15 files changed, 1301 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
new file mode 100644
index 0000000..73cb970
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Streams random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start streaming using {@link SocketStreamerExample}.</li>
+ * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class SocketStreamerExample {
+ /** Random number generator. */
+ private static final Random RAND = new Random();
+
+ /** Range within which to generate numbers. */
+ private static final int RANGE = 1000;
+
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ if (!ExamplesUtils.hasServerNodes(ignite))
+ return;
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+ try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
+ // Allow data updates.
+ stmr.allowOverwrite(true);
+
+ // Configure data transformation to count instances of the same word.
+ stmr.receiver(new StreamTransformer<Integer, Long>() {
+ @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
+ throws EntryProcessorException {
+ Long val = e.getValue();
+
+ e.setValue(val == null ? 1L : val + 1);
+
+ return null;
+ }
+ });
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ IgniteSocketStreamer<Tuple, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, Long>() {
+ @Override public Map.Entry<Integer, Long> extract(Tuple tuple) {
+ return new IgniteBiTuple<>(tuple.key, tuple.cnt);
+ }
+ });
+
+ sockStmr.start();
+
+ sendData(addr, PORT);
+ }
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param port Port.
+ */
+ private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
+ try (Socket sock = new Socket(addr, port);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+ while (true) {
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos)) {
+ Tuple tuple = new Tuple(RAND.nextInt(RANGE), 1L);
+
+ out.writeObject(tuple);
+
+ byte[] arr = bos.toByteArray();
+
+ oos.write(arr.length >>> 24);
+ oos.write(arr.length >>> 16);
+ oos.write(arr.length >>> 8);
+ oos.write(arr.length);
+
+ oos.write(arr);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tuple.
+ */
+ private static class Tuple implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0;
+
+ /** Key. */
+ private final int key;
+
+ /** Count. */
+ private final long cnt;
+
+ /**
+ * @param key Key.
+ * @param cnt Count.
+ */
+ public Tuple(int key, long cnt) {
+ this.key = key;
+ this.cnt = cnt;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
new file mode 100644
index 0000000..a535c73
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.numbers.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.stream.socket.*;
+
+import javax.cache.processor.*;
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Stream random numbers into the streaming cache using {@link IgniteSocketStreamer}.
+ * <p>
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
+ * zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
+ * <li>Start querying popular numbers using {@link QueryPopularNumbers}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZStringsSocketStreamerExample {
+ /** Random number generator. */
+ private static final Random RAND = new Random();
+
+ /** Range within which to generate numbers. */
+ private static final int RANGE = 1000;
+
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
+ if (!ExamplesUtils.hasServerNodes(ignite))
+ return;
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<Integer, Long> stmCache = ignite.getOrCreateCache(CacheConfig.randomNumbersCache());
+
+ try (IgniteDataStreamer<Integer, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
+ // Allow data updates.
+ stmr.allowOverwrite(true);
+
+ // Configure data transformation to count instances of the same word.
+ stmr.receiver(new StreamTransformer<Integer, Long>() {
+ @Override public Object process(MutableEntry<Integer, Long> e, Object... objects)
+ throws EntryProcessorException {
+ Long val = e.getValue();
+
+ e.setValue(val == null ? 1L : val + 1);
+
+ return null;
+ }
+ });
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ IgniteSocketStreamer<String, Integer, Long> sockStmr = new IgniteSocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setDelimiter(DELIM);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ // Converter from zero-terminated string to Java strings.
+ sockStmr.setConverter(new SocketMessageConverter<String>() {
+ @Override public String convert(byte[] msg) {
+ try {
+ return new String(msg, "ASCII");
+ }
+ catch (UnsupportedEncodingException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, Integer, Long>() {
+ @Override public Map.Entry<Integer, Long> extract(String input) {
+ String[] pair = input.split("=");
+ return new IgniteBiTuple<>(Integer.parseInt(pair[0]), Long.parseLong(pair[1]));
+ }
+ });
+
+ sockStmr.start();
+
+ sendData(addr, PORT);
+ }
+ }
+ }
+
+ /**
+ * @param addr Address.
+ * @param port Port.
+ */
+ private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
+ try (Socket sock = new Socket(addr, port);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+ while (true) {
+ int key = RAND.nextInt(RANGE);
+
+ String str = key + "=1";
+
+ byte[] arr = str.getBytes("ASCII");
+
+ oos.write(arr);
+ oos.write(DELIM);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
new file mode 100644
index 0000000..ae7bdf9
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
+ */
+package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
index 3f81dc4..a03d2c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridBufferedParser.java
@@ -18,7 +18,6 @@
package org.apache.ignite.internal.util.nio;
import org.apache.ignite.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
import java.io.*;
import java.nio.*;
@@ -33,9 +32,6 @@ import java.nio.*;
* | MSG_SIZE | MESSAGE | MSG_SIZE | MESSAGE |
* +--+--+--+--+--+--+...+--+--+--+--+--+--+--+...+--+
* </pre>
- * <p>
- * It expects that first 4 bytes in stream are {@link U#IGNITE_HEADER}. If beginning of a stream,
- * isn't equal to these bytes than exception will be thrown.
*/
public class GridBufferedParser implements GridNioParser {
/** Buffer metadata key. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
new file mode 100644
index 0000000..256597c
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDelimitedParser.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.*;
+
+import java.io.*;
+import java.nio.*;
+
+/**
+ * This class implements stream parser based on {@link GridNioDelimitedBuffer}.
+ * <p>
+ * The rule for this parser is that every message sent over the stream is appended with
+ * delimiter (bytes array). So, the stream structure is as follows:
+ * <pre>
+ * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * | MESSAGE | DELIMITER | MESSAGE | DELIMITER |
+ * +--+--+...+--+--+--+--+--+--+--+...+--+--+--+--+--+-
+ * </pre>
+ */
+public class GridDelimitedParser implements GridNioParser {
+ /** Buffer metadata key. */
+ private static final int BUF_META_KEY = GridNioSessionMetaKey.nextUniqueKey();
+
+ /** Delimiter. */
+ private final byte[] delim;
+
+ /** Direct buffer. */
+ private final boolean directBuf;
+
+ /**
+ * @param delim Delimiter.
+ * @param directBuf Direct buffer.
+ */
+ public GridDelimitedParser(byte[] delim, boolean directBuf) {
+ this.delim = delim;
+ this.directBuf = directBuf;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+ GridNioDelimitedBuffer nioBuf = ses.meta(BUF_META_KEY);
+
+ // Decode for a given session is called per one thread, so there should not be any concurrency issues.
+ // However, we make some additional checks.
+ if (nioBuf == null) {
+ nioBuf = new GridNioDelimitedBuffer(delim);
+
+ GridNioDelimitedBuffer old = ses.addMeta(BUF_META_KEY, nioBuf);
+
+ assert old == null;
+ }
+
+ return nioBuf.read(buf);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ByteBuffer encode(GridNioSession ses, Object msg) throws IOException, IgniteCheckedException {
+ byte[] msg0 = (byte[])msg;
+
+ int cap = msg0.length + delim.length;
+ ByteBuffer res = directBuf ? ByteBuffer.allocateDirect(cap) : ByteBuffer.allocate(cap);
+
+ res.put(msg0);
+ res.put(delim);
+
+ res.flip();
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return this.getClass().getSimpleName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
new file mode 100644
index 0000000..2b764ec
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBuffer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.jetbrains.annotations.*;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Buffer with message delimiter support.
+ */
+public class GridNioDelimitedBuffer {
+ /** Delimiter. */
+ private final byte[] delim;
+
+ /** Data. */
+ private byte[] data = new byte[16384];
+
+ /** Count. */
+ private int cnt;
+
+ /** Index. */
+ private int idx;
+
+ /**
+ * @param delim Delimiter.
+ */
+ public GridNioDelimitedBuffer(byte[] delim) {
+ assert delim != null;
+ assert delim.length > 0;
+
+ this.delim = delim;
+
+ reset();
+ }
+
+ /**
+ * Resets buffer state.
+ */
+ private void reset() {
+ cnt = 0;
+ idx = 0;
+ }
+
+ /**
+ * @param buf Buffer.
+ * @return Message bytes or {@code null} if message is not fully read yet.
+ */
+ @Nullable public byte[] read(ByteBuffer buf) {
+ while(buf.hasRemaining()) {
+ if (cnt == data.length)
+ data = Arrays.copyOf(data, data.length * 2);
+
+ byte b = buf.get();
+
+ data[cnt++] = b;
+
+ if (b == delim[idx])
+ idx++;
+ else if (idx > 0) {
+ int pos = cnt - idx;
+
+ idx = 0;
+
+ for (int i = pos; i < cnt; i++) {
+ if (data[pos] == delim[idx]) {
+ pos++;
+
+ idx++;
+ }
+ else {
+ pos = cnt - idx;
+
+ idx = 0;
+ }
+ }
+ }
+
+ if (idx == delim.length) {
+ byte[] bytes = Arrays.copyOfRange(data, 0, cnt - delim.length);
+
+ reset();
+
+ return bytes;
+ }
+ }
+
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index c729362..b99521a 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -34,6 +34,9 @@ public abstract class StreamAdapter<T, K, V> {
/** Streamer. */
private IgniteDataStreamer<K, V> stmr;
+ /** Ignite. */
+ private Ignite ignite;
+
/**
* Empty constructor.
*/
@@ -81,6 +84,20 @@ public abstract class StreamAdapter<T, K, V> {
}
/**
+ * @return Provided {@link Ignite} instance.
+ */
+ public Ignite getIgnite() {
+ return ignite;
+ }
+
+ /**
+ * @param ignite {@link Ignite} instance.
+ */
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
+ /**
* Converts given message to a tuple and adds it to the underlying streamer.
*
* @param msg Message to convert.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
new file mode 100644
index 0000000..66369ea
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.adapters.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Default threads. */
+ private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Address. */
+ private InetAddress addr;
+
+ /** Server port. */
+ private int port;
+
+ /** Threads number. */
+ private int threads = DFLT_THREADS;
+
+ /** Direct mode. */
+ private boolean directMode;
+
+ /** Delimiter. */
+ private byte[] delim;
+
+ /** Converter. */
+ private SocketMessageConverter<T> converter;
+
+ /** Server. */
+ private GridNioServer<byte[]> srv;
+
+ /**
+ * Sets server address.
+ *
+ * @param addr Address.
+ */
+ public void setAddr(InetAddress addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * Sets port number.
+ *
+ * @param port Port.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets threadds amount.
+ *
+ * @param threads Threads.
+ */
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets direct mode flag.
+ *
+ * @param directMode Direct mode.
+ */
+ public void setDirectMode(boolean directMode) {
+ this.directMode = directMode;
+ }
+
+ /**
+ * Sets message delimiter.
+ *
+ * @param delim Delimiter.
+ */
+ public void setDelimiter(byte[] delim) {
+ this.delim = delim;
+ }
+
+ /**
+ * Sets message converter.
+ *
+ * @param converter Converter.
+ */
+ public void setConverter(SocketMessageConverter<T> converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+ @Override public void onConnected(GridNioSession ses) {
+ assert ses.accepted();
+
+ if (log.isDebugEnabled())
+ log.debug("Accepted connection: " + ses.remoteAddress());
+ }
+
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (e != null)
+ log.error("Connection failed with exception", e);
+ }
+
+ @Override public void onMessage(GridNioSession ses, byte[] msg) {
+ addMessage(converter.convert(msg));
+ }
+ };
+
+ ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+ GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+ new GridDelimitedParser(delim, directMode);
+
+ if (converter == null)
+ converter = new DefaultConverter<>();
+
+ GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+ GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+ try {
+ srv = new GridNioServer.Builder<byte[]>()
+ .address(addr == null ? InetAddress.getLocalHost() : addr)
+ .port(port)
+ .listener(lsnr)
+ .logger(log)
+ .selectorCount(threads)
+ .byteOrder(byteOrder)
+ .filters(filters)
+ .build();
+ }
+ catch (IgniteCheckedException | UnknownHostException e) {
+ throw new IgniteException(e);
+ }
+
+ srv.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server started on " + addr + ':' + port);
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ srv.stop();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server stopped");
+ }
+
+ /**
+ * Converts message to Java object using Jdk marshaller.
+ */
+ private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+ /** Marshaller. */
+ private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public T convert(byte[] msg) {
+ try {
+ return MARSH.unmarshal(msg, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
new file mode 100644
index 0000000..8161d86
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+/**
+ * Socket message converter.
+ */
+public interface SocketMessageConverter<T> {
+ /**
+ * Converter message represented by array of bytes to object.
+ *
+ * @param msg Message.
+ * @return Converted object.
+ */
+ public T convert(byte[] msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..e1cef65
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains socket streamer implementation.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
new file mode 100644
index 0000000..a0dd2e5
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/util/nio/GridNioDelimitedBufferTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import junit.framework.TestCase;
+
+import java.nio.*;
+import java.util.*;
+
+/**
+ * Tests for {@link GridNioDelimitedBuffer}.
+ */
+public class GridNioDelimitedBufferTest extends TestCase {
+ /** */
+ private static final String ASCII = "ASCII";
+
+ /**
+ * Tests simple delimiter (excluded from alphabet)
+ */
+ public void testReadZString() throws Exception {
+ Random rnd = new Random();
+
+ int buffSize = 0;
+
+ byte[] delim = new byte[] {0};
+
+ List<String> strs = new ArrayList<>(50);
+
+ for (int i = 0; i < 50; i++) {
+ int len = rnd.nextInt(128) + 1;
+
+ buffSize += len + delim.length;
+
+ StringBuilder sb = new StringBuilder(len);
+
+ for (int j = 0; j < len; j++)
+ sb.append((char)(rnd.nextInt(26) + 'a'));
+
+
+ strs.add(sb.toString());
+ }
+
+ ByteBuffer buff = ByteBuffer.allocate(buffSize);
+
+ for (String str : strs) {
+ buff.put(str.getBytes(ASCII));
+ buff.put(delim);
+ }
+
+ buff.flip();
+
+ byte[] msg;
+
+ GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim);
+
+ List<String> res = new ArrayList<>(strs.size());
+
+ while ((msg = delimBuff.read(buff)) != null)
+ res.add(new String(msg, ASCII));
+
+ assertEquals(strs, res);
+ }
+
+ /**
+ * Tests compound delimiter (included to alphabet)
+ */
+ public void testDelim() throws Exception {
+ byte[] delim = "aabb".getBytes(ASCII);
+
+ List<String> strs = Arrays.asList("za", "zaa", "zaab", "zab", "zaabaababbbbabaab");
+
+ int buffSize = 0;
+
+ for (String str : strs)
+ buffSize += str.length() + delim.length;
+
+ ByteBuffer buff = ByteBuffer.allocate(buffSize);
+
+ for (String str : strs) {
+ buff.put(str.getBytes(ASCII));
+ buff.put(delim);
+ }
+
+ buff.flip();
+
+ byte[] msg;
+
+ GridNioDelimitedBuffer delimBuff = new GridNioDelimitedBuffer(delim);
+
+ List<String> res = new ArrayList<>(strs.size());
+
+ while ((msg = delimBuff.read(buff)) != null)
+ res.add(new String(msg, ASCII));
+
+ assertEquals(strs, res);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
new file mode 100644
index 0000000..19852ce
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.stream.adapters.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link IgniteSocketStreamer}.
+ */
+public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Grid count. */
+ private final static int GRID_CNT = 3;
+
+ /** Count. */
+ private static final int CNT = 500;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
+
+ /** Port. */
+ private static int port;
+
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration() throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration();
+
+ CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrids(GRID_CNT);
+ ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ try (ServerSocket sock = new ServerSocket(0)) {
+ port = sock.getLocalPort();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ ignite.cache(null).clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedDefaultConverter() throws Exception {
+ test(null, null, new Runnable() {
+ @Override public void run() {
+ try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg.length >>> 24);
+ os.write(msg.length >>> 16);
+ os.write(msg.length >>> 8);
+ os.write(msg.length);
+
+ os.write(msg);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, null, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(0);
+ os.write(0);
+ os.write(0);
+ os.write(4);
+
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedDefaultConverter() throws Exception {
+ test(null, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg);
+ os.write(DELIM);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+
+ os.write(DELIM);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param converter Converter.
+ * @param r Runnable..
+ */
+ private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+ {
+ IgniteSocketStreamer<Tuple, Integer, String> sockStmr = null;
+
+ try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
+
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ sockStmr = new IgniteSocketStreamer<>();
+
+ IgniteCache<Integer, String> cache = ignite.cache(null);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setPort(port);
+
+ sockStmr.setDelimiter(delim);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Tuple msg) {
+ return new IgniteBiTuple<>(msg.key, msg.val);
+ }
+ });
+
+ if (converter != null)
+ sockStmr.setConverter(converter);
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+ sockStmr.start();
+
+ r.run();
+
+ latch.await();
+
+ assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+
+ for (int i = 0; i < CNT; i++)
+ assertEquals(Integer.toString(i), cache.get(i));
+ }
+ finally {
+ if (sockStmr != null)
+ sockStmr.stop();
+ }
+
+ }
+
+ /**
+ * Tuple.
+ */
+ private static class Tuple implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Key. */
+ private final int key;
+
+ /** Value. */
+ private final String val;
+
+ /**
+ * @param key Key.
+ */
+ Tuple(int key) {
+ this.key = key;
+ this.val = Integer.toString(key);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
new file mode 100644
index 0000000..2e28469
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains tests for socket streamer.
+ */
+package org.apache.ignite.stream.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
new file mode 100644
index 0000000..87bbfbb
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.testsuites;
+
+import org.apache.ignite.stream.socket.*;
+
+import junit.framework.*;
+
+/**
+ * Stream test suite.
+ */
+public class IgniteStreamTestSuite extends TestSuite {
+ /**
+ * @return Stream tests suite.
+ * @throws Exception If failed.
+ */
+ public static TestSuite suite() throws Exception {
+ TestSuite suite = new TestSuite("Ignite Stream Test Suite");
+
+ suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class));
+
+ return suite;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/53995dcb/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
index 941b06e..32cd038 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteUtilSelfTestSuite.java
@@ -67,6 +67,7 @@ public class IgniteUtilSelfTestSuite extends TestSuite {
suite.addTestSuite(GridNioSelfTest.class);
suite.addTestSuite(GridNioFilterChainSelfTest.class);
suite.addTestSuite(GridNioSslSelfTest.class);
+ suite.addTestSuite(GridNioDelimitedBufferTest.class);
return suite;
}
[20/21] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-5' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-5' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/94e202aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/94e202aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/94e202aa
Branch: refs/heads/ignite-866
Commit: 94e202aa7046de67a2c0e0da4230d424e1c95705
Parents: ee40819 15aa1cf
Author: Yakov Zhdanov <yz...@gridgain.com>
Authored: Fri May 15 17:53:29 2015 +0300
Committer: Yakov Zhdanov <yz...@gridgain.com>
Committed: Fri May 15 17:53:29 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 136 ++++++-------------
.../internal/interop/InteropBootstrap.java | 34 +++++
.../interop/InteropBootstrapFactory.java | 39 ++++++
.../internal/interop/InteropIgnition.java | 103 ++++++++++++++
.../internal/interop/InteropProcessor.java | 25 ++++
5 files changed, 240 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
[15/21] incubator-ignite git commit: ignite-430 Words count Socket
streamer examples
Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/896b426b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/896b426b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/896b426b
Branch: refs/heads/ignite-866
Commit: 896b426bff3e37aeebbb00f54e179492543f1a1d
Parents: fe78d42
Author: agura <ag...@gridgain.com>
Authored: Thu May 14 20:29:33 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:40 2015 +0300
----------------------------------------------------------------------
.../streaming/StreamTransformerExample.java | 4 +-
.../streaming/StreamVisitorExample.java | 4 +-
.../socket/WordsSocketStreamerServer.java | 93 --------------------
.../streaming/wordcount/QueryWords.java | 4 +-
.../streaming/wordcount/StreamWords.java | 4 +-
.../socket/WordsSocketStreamerClient.java | 14 +--
.../socket/WordsSocketStreamerServer.java | 27 ++++--
7 files changed, 31 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
index 5e95892..966fce2 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamTransformerExample.java
@@ -30,11 +30,9 @@ import java.util.*;
* Stream random numbers into the streaming cache.
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start streaming using {@link StreamTransformerExample}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class StreamTransformerExample {
/** Random number generator. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
index 0fbce68..baae5af 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/StreamVisitorExample.java
@@ -31,11 +31,9 @@ import java.util.*;
* Stream random numbers into the streaming cache.
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start streaming using {@link StreamVisitorExample}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class StreamVisitorExample {
/** Random number generator. */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
deleted file mode 100644
index 5af746d..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Receives words through socket using {@link SocketStreamer} and message size based protocol
- * and streams them into Ignite cache.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
- * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
- * <li>Start querying popular words using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class WordsSocketStreamerServer {
- /** Port. */
- private static final int PORT = 5555;
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
-
- if (!ExamplesUtils.hasServerNodes(ignite)) {
- ignite.close();
-
- return;
- }
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
-
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index faf8b51..58c6ef2 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -28,12 +28,10 @@ import java.util.*;
* Periodically query popular numbers from the streaming cache.
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start streaming using {@link StreamWords}.</li>
* <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class QueryWords {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index 26be178..6024c4b 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -27,12 +27,10 @@ import java.io.*;
* Stream words into Ignite cache.
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start streaming using {@link StreamWords}.</li>
* <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class StreamWords {
/**
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
index ea3beaa..c4d7b8c 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerClient.java
@@ -25,19 +25,19 @@ import java.io.*;
import java.net.*;
/**
- * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
- * Example illustrates usage of TCP socket streamer in case of non-Java clients.
- * In this example words are zero-terminated strings.
+ * Example demonstrates streaming of data from external components into Ignite cache.
+ * <p>
+ * {@code WordsSocketStreamerClient} is simple socket streaming client implementation that sends words to socket server
+ * based on {@link SocketStreamer} using message delimiter based protocol. Example illustrates usage of TCP socket
+ * streamer in case of non-Java clients. In this example words are zero-terminated strings.
* <p>
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
* <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
* <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class WordsSocketStreamerClient {
/** Port. */
@@ -59,7 +59,7 @@ public class WordsSocketStreamerClient {
System.out.println("Words streaming started.");
while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ try (InputStream in = WordsSocketStreamerClient.class.getResourceAsStream("../alice-in-wonderland.txt");
LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
for (String word : line.split(" ")) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/896b426b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index 259c925..6a8911c 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -30,19 +30,20 @@ import java.net.*;
import java.util.*;
/**
- * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * Example demonstrates streaming of data from external components into Ignite cache.
+ * <p>
+ * {@code WordsSocketStreamerServer} is simple socket streaming server implementation that
+ * receives words from socket using {@link SocketStreamer} and message delimiter based protocol
* and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
* In this example words are zero-terminated strings.
* <p>
* To start the example, you should:
* <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start a few nodes using {@link ExampleNodeStartup}.</li>
* <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
* <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
* <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
*/
public class WordsSocketStreamerServer {
/** Port. */
@@ -52,9 +53,12 @@ public class WordsSocketStreamerServer {
private static final byte[] DELIM = new byte[] {0};
/**
- * @param args Args.
+ * Starts socket streaming server.
+ *
+ * @param args Command line arguments (none required).
+ * @throws Exception If failed.
*/
- public static void main(String[] args) throws InterruptedException, IOException {
+ public static void main(String[] args) throws Exception {
// Mark this cluster member as client.
Ignition.setClientMode(true);
@@ -106,6 +110,15 @@ public class WordsSocketStreamerServer {
}
});
- sockStmr.start();
+ try {
+ sockStmr.start();
+ }
+ catch (IgniteException e) {
+ System.out.println("Streaming server didn't start due to an error: ");
+
+ e.printStackTrace();
+
+ ignite.close();
+ }
}
}
[13/21] incubator-ignite git commit: ignite-430 Words count Socket
streamer examples
Posted by sb...@apache.org.
ignite-430 Words count Socket streamer examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/7ee85179
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/7ee85179
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/7ee85179
Branch: refs/heads/ignite-866
Commit: 7ee85179103df637ba594fab68755dee71b69997
Parents: d87efce
Author: agura <ag...@gridgain.com>
Authored: Wed May 13 20:56:22 2015 +0300
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:34 2015 +0300
----------------------------------------------------------------------
.../ignite/examples/streaming/package-info.java | 1 -
.../streaming/socket/SocketStreamerExample.java | 128 --------
.../socket/WordsSocketStreamerClient.java | 86 +++++
.../socket/WordsSocketStreamerServer.java | 93 ++++++
.../socket/ZStringsSocketStreamerExample.java | 141 ---------
.../socket/ZWordsSocketStreamerClient.java | 81 +++++
.../socket/ZWordsSocketStreamerServer.java | 111 +++++++
.../examples/streaming/socket/package-info.java | 3 +-
.../streaming/wordcount/CacheConfig.java | 2 +-
.../streaming/wordcount/QueryWords.java | 2 +-
.../streaming/wordcount/StreamWords.java | 2 +-
.../streaming/wordcount/package-info.java | 1 -
.../org/apache/ignite/stream/StreamAdapter.java | 111 +++++++
.../ignite/stream/StreamTupleExtractor.java | 33 ++
.../ignite/stream/adapters/StreamAdapter.java | 111 -------
.../stream/adapters/StreamTupleExtractor.java | 33 --
.../ignite/stream/adapters/package-info.java | 21 --
.../stream/socket/IgniteSocketStreamer.java | 217 -------------
.../ignite/stream/socket/SocketStreamer.java | 218 +++++++++++++
.../socket/IgniteSocketStreamerSelfTest.java | 315 -------------------
.../stream/socket/SocketStreamerSelfTest.java | 315 +++++++++++++++++++
.../testsuites/IgniteStreamTestSuite.java | 2 +-
22 files changed, 1053 insertions(+), 974 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
index 43dea13..43fbab3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
* Demonstrates usage of data streamer.
*/
package org.apache.ignite.examples.streaming;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
deleted file mode 100644
index 487572a..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/SocketStreamerExample.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message size based protocol.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link SocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class SocketStreamerExample {
- /** Port. */
- private static final int PORT = 5555;
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
- if (!ExamplesUtils.hasServerNodes(ignite))
- return;
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
-
- sendData(addr, PORT);
- }
- }
- }
-
- /**
- * @param addr Address.
- * @param port Port.
- */
- private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
- try (Socket sock = new Socket(addr, port);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(bos)) {
-
- // Write message
- out.writeObject(word);
-
- byte[] arr = bos.toByteArray();
-
- // Write message length
- oos.write(arr.length >>> 24);
- oos.write(arr.length >>> 16);
- oos.write(arr.length >>> 8);
- oos.write(arr.length);
-
- oos.write(arr);
- }
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
new file mode 100644
index 0000000..c5ec079
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerClient.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message size based protocol.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerClient {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress addr = InetAddress.getLocalHost();
+
+ try (Socket sock = new Socket(addr, PORT);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+ System.out.println("Words streaming started.");
+
+ while (true) {
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ ObjectOutputStream out = new ObjectOutputStream(bos)) {
+
+ // Write message
+ out.writeObject(word);
+
+ byte[] arr = bos.toByteArray();
+
+ // Write message length
+ oos.write(arr.length >>> 24);
+ oos.write(arr.length >>> 16);
+ oos.write(arr.length >>> 8);
+ oos.write(arr.length);
+
+ oos.write(arr);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
new file mode 100644
index 0000000..5af746d
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/WordsSocketStreamerServer.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message size based protocol
+ * and streams them into Ignite cache.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link WordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link WordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class WordsSocketStreamerServer {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+ if (!ExamplesUtils.hasServerNodes(ignite)) {
+ ignite.close();
+
+ return;
+ }
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+ IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ // Configure socket streamer
+ SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
+ }
+ });
+
+ sockStmr.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
deleted file mode 100644
index fa5aa28..0000000
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZStringsSocketStreamerExample.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.examples.streaming.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.affinity.*;
-import org.apache.ignite.examples.*;
-import org.apache.ignite.examples.streaming.wordcount.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.stream.socket.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-
-/**
- * Stream words into Ignite cache through socket using {@link IgniteSocketStreamer} and message delimiter based
- * protocol.
- * <p>
- * Example illustrates usage of TCP socket streamer in case of non-Java clients. In this example client streams
- * zero-terminated strings.
- * <p>
- * To start the example, you should:
- * <ul>
- * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
- * <li>Start streaming using {@link ZStringsSocketStreamerExample}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
- * </ul>
- * <p>
- * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
- */
-public class ZStringsSocketStreamerExample {
- /** Port. */
- private static final int PORT = 5555;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0};
-
- /**
- * @param args Args.
- */
- public static void main(String[] args) throws InterruptedException, IOException {
- // Mark this cluster member as client.
- Ignition.setClientMode(true);
-
- try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
- if (!ExamplesUtils.hasServerNodes(ignite))
- return;
-
- // The cache is configured with sliding window holding 1 second of the streaming data.
- IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
-
- try (IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName())) {
- InetAddress addr = InetAddress.getLocalHost();
-
- // Configure socket streamer
- IgniteSocketStreamer<String, AffinityUuid, String> sockStmr = new IgniteSocketStreamer<>();
-
- sockStmr.setAddr(addr);
-
- sockStmr.setPort(PORT);
-
- sockStmr.setDelimiter(DELIM);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- // Converter from zero-terminated string to Java strings.
- sockStmr.setConverter(new SocketMessageConverter<String>() {
- @Override public String convert(byte[] msg) {
- try {
- return new String(msg, "ASCII");
- }
- catch (UnsupportedEncodingException e) {
- throw new IgniteException(e);
- }
- }
- });
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
- @Override public Map.Entry<AffinityUuid, String> extract(String word) {
- // By using AffinityUuid we ensure that identical
- // words are processed on the same cluster node.
- return new IgniteBiTuple<>(new AffinityUuid(word), word);
- }
- });
-
- sockStmr.start();
-
- sendData(addr, PORT);
- }
- }
- }
-
- /**
- * @param addr Address.
- * @param port Port.
- */
- private static void sendData(InetAddress addr, int port) throws IOException, InterruptedException {
- try (Socket sock = new Socket(addr, port);
- OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
-
- while (true) {
- try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
- LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
- for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
- for (String word : line.split(" ")) {
- if (!word.isEmpty()) {
- // Stream words into Ignite through socket.
- byte[] arr = word.getBytes("ASCII");
-
- // Write message
- oos.write(arr);
-
- // Write message delimiter
- oos.write(DELIM);
- }
- }
- }
- }
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
new file mode 100644
index 0000000..c17ccdc
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerClient.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+
+/**
+ * Sends words to socket server based on {@link SocketStreamer} using message delimiter based protocol.
+ * Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerClient {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws IOException {
+ InetAddress addr = InetAddress.getLocalHost();
+
+ try (Socket sock = new Socket(addr, PORT);
+ OutputStream oos = new BufferedOutputStream(sock.getOutputStream())) {
+
+ System.out.println("Words streaming started.");
+
+ while (true) {
+ try (InputStream in = StreamWords.class.getResourceAsStream("../wordcount/alice-in-wonderland.txt");
+ LineNumberReader rdr = new LineNumberReader(new InputStreamReader(in))) {
+ for (String line = rdr.readLine(); line != null; line = rdr.readLine()) {
+ for (String word : line.split(" ")) {
+ if (!word.isEmpty()) {
+ // Stream words into Ignite through socket.
+ byte[] arr = word.getBytes("ASCII");
+
+ // Write message
+ oos.write(arr);
+
+ // Write message delimiter
+ oos.write(DELIM);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
new file mode 100644
index 0000000..a0ef9da
--- /dev/null
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/ZWordsSocketStreamerServer.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.examples.streaming.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.affinity.*;
+import org.apache.ignite.examples.*;
+import org.apache.ignite.examples.streaming.wordcount.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.stream.socket.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+
+/**
+ * Receives words through socket using {@link SocketStreamer} and message delimiter based protocol
+ * and streams them into Ignite cache. Example illustrates usage of TCP socket streamer in case of non-Java clients.
+ * In this example words are zero-terminated strings.
+ * <p>
+ * To start the example, you should:
+ * <ul>
+ * <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
+ * <li>Start socket server using {@link ZWordsSocketStreamerServer}.</li>
+ * <li>Start a few socket clients using {@link ZWordsSocketStreamerClient}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
+ * </ul>
+ * <p>
+ * You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
+ */
+public class ZWordsSocketStreamerServer {
+ /** Port. */
+ private static final int PORT = 5555;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0};
+
+ /**
+ * @param args Args.
+ */
+ public static void main(String[] args) throws InterruptedException, IOException {
+ // Mark this cluster member as client.
+ Ignition.setClientMode(true);
+
+ Ignite ignite = Ignition.start("examples/config/example-ignite.xml");
+
+ if (!ExamplesUtils.hasServerNodes(ignite)) {
+ ignite.close();
+
+ return;
+ }
+
+ // The cache is configured with sliding window holding 1 second of the streaming data.
+ IgniteCache<AffinityUuid, String> stmCache = ignite.getOrCreateCache(CacheConfig.wordCache());
+
+ IgniteDataStreamer<AffinityUuid, String> stmr = ignite.dataStreamer(stmCache.getName());
+
+ InetAddress addr = InetAddress.getLocalHost();
+
+ // Configure socket streamer
+ SocketStreamer<String, AffinityUuid, String> sockStmr = new SocketStreamer<>();
+
+ sockStmr.setAddr(addr);
+
+ sockStmr.setPort(PORT);
+
+ sockStmr.setDelimiter(DELIM);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ // Converter from zero-terminated string to Java strings.
+ sockStmr.setConverter(new SocketMessageConverter<String>() {
+ @Override public String convert(byte[] msg) {
+ try {
+ return new String(msg, "ASCII");
+ }
+ catch (UnsupportedEncodingException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<String, AffinityUuid, String>() {
+ @Override public Map.Entry<AffinityUuid, String> extract(String word) {
+ // By using AffinityUuid we ensure that identical
+ // words are processed on the same cluster node.
+ return new IgniteBiTuple<>(new AffinityUuid(word), word);
+ }
+ });
+
+ sockStmr.start();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
index d0a480a..c516ab4 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/socket/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
- * Contains {@link org.apache.ignite.stream.socket.IgniteSocketStreamer} usage examples.
+ * Contains {@link org.apache.ignite.stream.socket.SocketStreamer} usage examples.
*/
package org.apache.ignite.examples.streaming.socket;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
index 58704ca..d17b97d 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/CacheConfig.java
@@ -26,7 +26,7 @@ import javax.cache.expiry.*;
import static java.util.concurrent.TimeUnit.*;
/**
- * Configuration for the streaming cache to store the stream of random numbers.
+ * Configuration for the streaming cache to store the stream of words.
* This cache is configured with sliding window of 1 second, which means that
* data older than 1 second will be automatically removed from the cache.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
index 3bd9d3d..149aa79 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/QueryWords.java
@@ -30,7 +30,7 @@ import java.util.*;
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
* <li>Start streaming using {@link StreamWords}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
index c59fa51..cc3c0cb 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/StreamWords.java
@@ -29,7 +29,7 @@ import java.io.*;
* <ul>
* <li>Start a few nodes using {@link ExampleNodeStartup} or by starting remote nodes as specified below.</li>
* <li>Start streaming using {@link StreamWords}.</li>
- * <li>Start querying popular numbers using {@link QueryWords}.</li>
+ * <li>Start querying popular words using {@link QueryWords}.</li>
* </ul>
* <p>
* You should start remote nodes by running {@link ExampleNodeStartup} in another JVM.
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
index 010f86a..5d48ae3 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/package-info.java
@@ -16,7 +16,6 @@
*/
/**
- * <!-- Package description. -->
* Streaming word count example.
*/
package org.apache.ignite.examples.streaming.wordcount;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
new file mode 100644
index 0000000..0c4e2d1
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamAdapter.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+ /** Tuple extractor. */
+ private StreamTupleExtractor<T, K, V> extractor;
+
+ /** Streamer. */
+ private IgniteDataStreamer<K, V> stmr;
+
+ /** Ignite. */
+ private Ignite ignite;
+
+ /**
+ * Empty constructor.
+ */
+ protected StreamAdapter() {
+ // No-op.
+ }
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor.
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided data streamer.
+ */
+ public IgniteDataStreamer<K, V> getStreamer() {
+ return stmr;
+ }
+
+ /**
+ * @param stmr Ignite data streamer.
+ */
+ public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+ this.stmr = stmr;
+ }
+
+ /**
+ * @return Provided tuple extractor.
+ */
+ public StreamTupleExtractor<T, K, V> getTupleExtractor() {
+ return extractor;
+ }
+
+ /**
+ * @param extractor Extractor for key-value tuples from messages.
+ */
+ public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided {@link Ignite} instance.
+ */
+ public Ignite getIgnite() {
+ return ignite;
+ }
+
+ /**
+ * @param ignite {@link Ignite} instance.
+ */
+ public void setIgnite(Ignite ignite) {
+ this.ignite = ignite;
+ }
+
+ /**
+ * Converts given message to a tuple and adds it to the underlying streamer.
+ *
+ * @param msg Message to convert.
+ */
+ protected void addMessage(T msg) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
new file mode 100644
index 0000000..d2a4ede
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
deleted file mode 100644
index b99521a..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import org.apache.ignite.*;
-
-import java.util.*;
-
-/**
- * Convenience adapter for streamers. Adapters are optional components for
- * streaming from different data sources. The purpose of adapters is to
- * convert different message formats into Ignite stream key-value tuples
- * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
- */
-public abstract class StreamAdapter<T, K, V> {
- /** Tuple extractor. */
- private StreamTupleExtractor<T, K, V> extractor;
-
- /** Streamer. */
- private IgniteDataStreamer<K, V> stmr;
-
- /** Ignite. */
- private Ignite ignite;
-
- /**
- * Empty constructor.
- */
- protected StreamAdapter() {
- // No-op.
- }
-
- /**
- * Stream adapter.
- *
- * @param stmr Streamer.
- * @param extractor Tuple extractor.
- */
- protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
- this.stmr = stmr;
- this.extractor = extractor;
- }
-
- /**
- * @return Provided data streamer.
- */
- public IgniteDataStreamer<K, V> getStreamer() {
- return stmr;
- }
-
- /**
- * @param stmr Ignite data streamer.
- */
- public void setStreamer(IgniteDataStreamer<K, V> stmr) {
- this.stmr = stmr;
- }
-
- /**
- * @return Provided tuple extractor.
- */
- public StreamTupleExtractor<T, K, V> getTupleExtractor() {
- return extractor;
- }
-
- /**
- * @param extractor Extractor for key-value tuples from messages.
- */
- public void setTupleExtractor(StreamTupleExtractor<T, K, V> extractor) {
- this.extractor = extractor;
- }
-
- /**
- * @return Provided {@link Ignite} instance.
- */
- public Ignite getIgnite() {
- return ignite;
- }
-
- /**
- * @param ignite {@link Ignite} instance.
- */
- public void setIgnite(Ignite ignite) {
- this.ignite = ignite;
- }
-
- /**
- * Converts given message to a tuple and adds it to the underlying streamer.
- *
- * @param msg Message to convert.
- */
- protected void addMessage(T msg) {
- Map.Entry<K, V> e = extractor.extract(msg);
-
- if (e != null)
- stmr.addData(e);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
deleted file mode 100644
index 9b0c395..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.adapters;
-
-import java.util.*;
-
-/**
- * Stream tuple extractor to convert messages to Ignite key-value tuples.
- */
-public interface StreamTupleExtractor<T, K, V> {
- /**
- * Extracts a key-value tuple from a message.
- *
- * @param msg Message.
- * @return Key-value tuple.
- */
- public Map.Entry<K, V> extract(T msg);
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
deleted file mode 100644
index a69ffc0..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Contains Ignite stream adapters.
- */
-package org.apache.ignite.stream.adapters;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
deleted file mode 100644
index 66369ea..0000000
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/IgniteSocketStreamer.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.internal.util.nio.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.stream.adapters.*;
-import org.jetbrains.annotations.*;
-
-import java.net.*;
-import java.nio.*;
-
-/**
- * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
- * streams into {@link IgniteDataStreamer} instance.
- * <p>
- * By default server uses size-based message processing. That is every message sent over the socket is prepended with
- * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
- * delimiter-based message processing will be used. That is every message sent over the socket is appended with
- * provided delimiter.
- * <p>
- * Received messages through socket converts to Java object using standard serialization. Conversion functionality
- * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
- * non Java clients).
- */
-public class IgniteSocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
- /** Default threads. */
- private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
-
- /** Logger. */
- private IgniteLogger log;
-
- /** Address. */
- private InetAddress addr;
-
- /** Server port. */
- private int port;
-
- /** Threads number. */
- private int threads = DFLT_THREADS;
-
- /** Direct mode. */
- private boolean directMode;
-
- /** Delimiter. */
- private byte[] delim;
-
- /** Converter. */
- private SocketMessageConverter<T> converter;
-
- /** Server. */
- private GridNioServer<byte[]> srv;
-
- /**
- * Sets server address.
- *
- * @param addr Address.
- */
- public void setAddr(InetAddress addr) {
- this.addr = addr;
- }
-
- /**
- * Sets port number.
- *
- * @param port Port.
- */
- public void setPort(int port) {
- this.port = port;
- }
-
- /**
- * Sets threadds amount.
- *
- * @param threads Threads.
- */
- public void setThreads(int threads) {
- this.threads = threads;
- }
-
- /**
- * Sets direct mode flag.
- *
- * @param directMode Direct mode.
- */
- public void setDirectMode(boolean directMode) {
- this.directMode = directMode;
- }
-
- /**
- * Sets message delimiter.
- *
- * @param delim Delimiter.
- */
- public void setDelimiter(byte[] delim) {
- this.delim = delim;
- }
-
- /**
- * Sets message converter.
- *
- * @param converter Converter.
- */
- public void setConverter(SocketMessageConverter<T> converter) {
- this.converter = converter;
- }
-
- /**
- * Starts streamer.
- *
- * @throws IgniteException If failed.
- */
- public void start() {
- A.notNull(getTupleExtractor(), "tupleExtractor");
- A.notNull(getStreamer(), "streamer");
- A.notNull(getIgnite(), "ignite");
- A.ensure(threads > 0, "threads > 0");
-
- log = getIgnite().log();
-
- GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
- @Override public void onConnected(GridNioSession ses) {
- assert ses.accepted();
-
- if (log.isDebugEnabled())
- log.debug("Accepted connection: " + ses.remoteAddress());
- }
-
- @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
- if (e != null)
- log.error("Connection failed with exception", e);
- }
-
- @Override public void onMessage(GridNioSession ses, byte[] msg) {
- addMessage(converter.convert(msg));
- }
- };
-
- ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
-
- GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
- new GridDelimitedParser(delim, directMode);
-
- if (converter == null)
- converter = new DefaultConverter<>();
-
- GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
-
- GridNioFilter[] filters = new GridNioFilter[] {codec};
-
- try {
- srv = new GridNioServer.Builder<byte[]>()
- .address(addr == null ? InetAddress.getLocalHost() : addr)
- .port(port)
- .listener(lsnr)
- .logger(log)
- .selectorCount(threads)
- .byteOrder(byteOrder)
- .filters(filters)
- .build();
- }
- catch (IgniteCheckedException | UnknownHostException e) {
- throw new IgniteException(e);
- }
-
- srv.start();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server started on " + addr + ':' + port);
- }
-
- /**
- * Stops streamer.
- */
- public void stop() {
- srv.stop();
-
- if (log.isDebugEnabled())
- log.debug("Socket streaming server stopped");
- }
-
- /**
- * Converts message to Java object using Jdk marshaller.
- */
- private static class DefaultConverter<T> implements SocketMessageConverter<T> {
- /** Marshaller. */
- private static final JdkMarshaller MARSH = new JdkMarshaller();
-
- /** {@inheritDoc} */
- @Override public T convert(byte[] msg) {
- try {
- return MARSH.unmarshal(msg, null);
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
new file mode 100644
index 0000000..07ce77e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.internal.util.nio.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.stream.*;
+
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.nio.*;
+
+/**
+ * Server that receives data from TCP socket, converts it to key-value pairs using {@link StreamTupleExtractor} and
+ * streams into {@link IgniteDataStreamer} instance.
+ * <p>
+ * By default server uses size-based message processing. That is every message sent over the socket is prepended with
+ * 4-byte integer header containing message size. If message delimiter is defined (see {@link #setDelimiter}) then
+ * delimiter-based message processing will be used. That is every message sent over the socket is appended with
+ * provided delimiter.
+ * <p>
+ * Received messages through socket converts to Java object using standard serialization. Conversion functionality
+ * can be customized via user defined {@link SocketMessageConverter} (e.g. in order to convert messages from
+ * non Java clients).
+ */
+public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
+ /** Default threads. */
+ private static final int DFLT_THREADS = Runtime.getRuntime().availableProcessors();
+
+ /** Logger. */
+ private IgniteLogger log;
+
+ /** Address. */
+ private InetAddress addr;
+
+ /** Server port. */
+ private int port;
+
+ /** Threads number. */
+ private int threads = DFLT_THREADS;
+
+ /** Direct mode. */
+ private boolean directMode;
+
+ /** Delimiter. */
+ private byte[] delim;
+
+ /** Converter. */
+ private SocketMessageConverter<T> converter;
+
+ /** Server. */
+ private GridNioServer<byte[]> srv;
+
+ /**
+ * Sets server address.
+ *
+ * @param addr Address.
+ */
+ public void setAddr(InetAddress addr) {
+ this.addr = addr;
+ }
+
+ /**
+ * Sets port number.
+ *
+ * @param port Port.
+ */
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ /**
+ * Sets threadds amount.
+ *
+ * @param threads Threads.
+ */
+ public void setThreads(int threads) {
+ this.threads = threads;
+ }
+
+ /**
+ * Sets direct mode flag.
+ *
+ * @param directMode Direct mode.
+ */
+ public void setDirectMode(boolean directMode) {
+ this.directMode = directMode;
+ }
+
+ /**
+ * Sets message delimiter.
+ *
+ * @param delim Delimiter.
+ */
+ public void setDelimiter(byte[] delim) {
+ this.delim = delim;
+ }
+
+ /**
+ * Sets message converter.
+ *
+ * @param converter Converter.
+ */
+ public void setConverter(SocketMessageConverter<T> converter) {
+ this.converter = converter;
+ }
+
+ /**
+ * Starts streamer.
+ *
+ * @throws IgniteException If failed.
+ */
+ public void start() {
+ A.notNull(getTupleExtractor(), "tupleExtractor");
+ A.notNull(getStreamer(), "streamer");
+ A.notNull(getIgnite(), "ignite");
+ A.ensure(threads > 0, "threads > 0");
+
+ log = getIgnite().log();
+
+ GridNioServerListener<byte[]> lsnr = new GridNioServerListenerAdapter<byte[]>() {
+ @Override public void onConnected(GridNioSession ses) {
+ assert ses.accepted();
+
+ if (log.isDebugEnabled())
+ log.debug("Accepted connection: " + ses.remoteAddress());
+ }
+
+ @Override public void onDisconnected(GridNioSession ses, @Nullable Exception e) {
+ if (e != null)
+ log.error("Connection failed with exception", e);
+ }
+
+ @Override public void onMessage(GridNioSession ses, byte[] msg) {
+ addMessage(converter.convert(msg));
+ }
+ };
+
+ ByteOrder byteOrder = ByteOrder.BIG_ENDIAN;
+
+ GridNioParser parser = F.isEmpty(delim) ? new GridBufferedParser(directMode, byteOrder) :
+ new GridDelimitedParser(delim, directMode);
+
+ if (converter == null)
+ converter = new DefaultConverter<>();
+
+ GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
+
+ GridNioFilter[] filters = new GridNioFilter[] {codec};
+
+ try {
+ srv = new GridNioServer.Builder<byte[]>()
+ .address(addr == null ? InetAddress.getLocalHost() : addr)
+ .port(port)
+ .listener(lsnr)
+ .logger(log)
+ .selectorCount(threads)
+ .byteOrder(byteOrder)
+ .filters(filters)
+ .build();
+ }
+ catch (IgniteCheckedException | UnknownHostException e) {
+ throw new IgniteException(e);
+ }
+
+ srv.start();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server started on " + addr + ':' + port);
+ }
+
+ /**
+ * Stops streamer.
+ */
+ public void stop() {
+ srv.stop();
+
+ if (log.isDebugEnabled())
+ log.debug("Socket streaming server stopped");
+ }
+
+ /**
+ * Converts message to Java object using Jdk marshaller.
+ */
+ private static class DefaultConverter<T> implements SocketMessageConverter<T> {
+ /** Marshaller. */
+ private static final JdkMarshaller MARSH = new JdkMarshaller();
+
+ /** {@inheritDoc} */
+ @Override public T convert(byte[] msg) {
+ try {
+ return MARSH.unmarshal(msg, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
deleted file mode 100644
index 19852ce..0000000
--- a/modules/core/src/test/java/org/apache/ignite/stream/socket/IgniteSocketStreamerSelfTest.java
+++ /dev/null
@@ -1,315 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.stream.socket;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cache.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.events.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.marshaller.*;
-import org.apache.ignite.marshaller.jdk.*;
-import org.apache.ignite.spi.discovery.tcp.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
-import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
-import org.apache.ignite.stream.adapters.*;
-import org.apache.ignite.testframework.junits.common.*;
-
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-
-import static org.apache.ignite.events.EventType.*;
-
-/**
- * Tests {@link IgniteSocketStreamer}.
- */
-public class IgniteSocketStreamerSelfTest extends GridCommonAbstractTest {
- /** IP finder. */
- private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
-
- /** Grid count. */
- private final static int GRID_CNT = 3;
-
- /** Count. */
- private static final int CNT = 500;
-
- /** Delimiter. */
- private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
-
- /** Port. */
- private static int port;
-
- /** Ignite. */
- private static Ignite ignite;
-
- /** {@inheritDoc} */
- @Override protected IgniteConfiguration getConfiguration() throws Exception {
- IgniteConfiguration cfg = super.getConfiguration();
-
- CacheConfiguration ccfg = cacheConfiguration(cfg, null);
-
- cfg.setCacheConfiguration(ccfg);
-
- TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
-
- discoSpi.setIpFinder(IP_FINDER);
-
- cfg.setDiscoverySpi(discoSpi);
-
- return cfg;
- }
-
-
- /** {@inheritDoc} */
- @Override protected void beforeTestsStarted() throws Exception {
- ignite = startGrids(GRID_CNT);
- ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
-
- try (ServerSocket sock = new ServerSocket(0)) {
- port = sock.getLocalPort();
- }
- }
-
- /** {@inheritDoc} */
- @Override protected void afterTestsStopped() throws Exception {
- stopAllGrids();
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- ignite.cache(null).clear();
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSizeBasedDefaultConverter() throws Exception {
- test(null, null, new Runnable() {
- @Override public void run() {
- try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
- Marshaller marsh = new JdkMarshaller();
-
- for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
-
- os.write(msg.length >>> 24);
- os.write(msg.length >>> 16);
- os.write(msg.length >>> 8);
- os.write(msg.length);
-
- os.write(msg);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testSizeBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
- int i = (msg[0] & 0xFF) << 24;
- i |= (msg[1] & 0xFF) << 16;
- i |= (msg[2] & 0xFF) << 8;
- i |= msg[3] & 0xFF;
-
- return new Tuple(i);
- }
- };
-
- test(converter, null, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
- for (int i = 0; i < CNT; i++) {
- os.write(0);
- os.write(0);
- os.write(0);
- os.write(4);
-
- os.write(i >>> 24);
- os.write(i >>> 16);
- os.write(i >>> 8);
- os.write(i);
- }
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDelimiterBasedDefaultConverter() throws Exception {
- test(null, DELIM, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
- Marshaller marsh = new JdkMarshaller();
-
- for (int i = 0; i < CNT; i++) {
- byte[] msg = marsh.marshal(new Tuple(i));
-
- os.write(msg);
- os.write(DELIM);
- }
- }
- catch (IOException | IgniteCheckedException e) {
- throw new IgniteException(e);
- }
- }
- });
-
- }
-
- /**
- * @throws Exception If failed.
- */
- public void testDelimiterBasedCustomConverter() throws Exception {
- SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
- @Override public Tuple convert(byte[] msg) {
- int i = (msg[0] & 0xFF) << 24;
- i |= (msg[1] & 0xFF) << 16;
- i |= (msg[2] & 0xFF) << 8;
- i |= msg[3] & 0xFF;
-
- return new Tuple(i);
- }
- };
-
- test(converter, DELIM, new Runnable() {
- @Override public void run() {
- try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
- OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
-
- for (int i = 0; i < CNT; i++) {
- os.write(i >>> 24);
- os.write(i >>> 16);
- os.write(i >>> 8);
- os.write(i);
-
- os.write(DELIM);
- }
- }
- catch (IOException e) {
- throw new IgniteException(e);
- }
- }
- });
- }
-
- /**
- * @param converter Converter.
- * @param r Runnable..
- */
- private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
- {
- IgniteSocketStreamer<Tuple, Integer, String> sockStmr = null;
-
- try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
-
- stmr.allowOverwrite(true);
- stmr.autoFlushFrequency(10);
-
- sockStmr = new IgniteSocketStreamer<>();
-
- IgniteCache<Integer, String> cache = ignite.cache(null);
-
- sockStmr.setIgnite(ignite);
-
- sockStmr.setStreamer(stmr);
-
- sockStmr.setPort(port);
-
- sockStmr.setDelimiter(delim);
-
- sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
- @Override public Map.Entry<Integer, String> extract(Tuple msg) {
- return new IgniteBiTuple<>(msg.key, msg.val);
- }
- });
-
- if (converter != null)
- sockStmr.setConverter(converter);
-
- final CountDownLatch latch = new CountDownLatch(CNT);
-
- IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
- @Override public boolean apply(UUID uuid, CacheEvent evt) {
- latch.countDown();
-
- return true;
- }
- };
-
- ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
-
- sockStmr.start();
-
- r.run();
-
- latch.await();
-
- assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
-
- for (int i = 0; i < CNT; i++)
- assertEquals(Integer.toString(i), cache.get(i));
- }
- finally {
- if (sockStmr != null)
- sockStmr.stop();
- }
-
- }
-
- /**
- * Tuple.
- */
- private static class Tuple implements Serializable {
- /** Serial version uid. */
- private static final long serialVersionUID = 0L;
-
- /** Key. */
- private final int key;
-
- /** Value. */
- private final String val;
-
- /**
- * @param key Key.
- */
- Tuple(int key) {
- this.key = key;
- this.val = Integer.toString(key);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
new file mode 100644
index 0000000..752e43c
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/stream/socket/SocketStreamerSelfTest.java
@@ -0,0 +1,315 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.socket;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cache.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.events.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.marshaller.*;
+import org.apache.ignite.marshaller.jdk.*;
+import org.apache.ignite.spi.discovery.tcp.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.*;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.*;
+import org.apache.ignite.stream.*;
+import org.apache.ignite.testframework.junits.common.*;
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+
+import static org.apache.ignite.events.EventType.*;
+
+/**
+ * Tests {@link SocketStreamer}.
+ */
+public class SocketStreamerSelfTest extends GridCommonAbstractTest {
+ /** IP finder. */
+ private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
+
+ /** Grid count. */
+ private final static int GRID_CNT = 3;
+
+ /** Count. */
+ private static final int CNT = 500;
+
+ /** Delimiter. */
+ private static final byte[] DELIM = new byte[] {0, 1, 2, 3, 4, 5, 4, 3, 2, 1, 0};
+
+ /** Port. */
+ private static int port;
+
+ /** Ignite. */
+ private static Ignite ignite;
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration() throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration();
+
+ CacheConfiguration ccfg = cacheConfiguration(cfg, null);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi();
+
+ discoSpi.setIpFinder(IP_FINDER);
+
+ cfg.setDiscoverySpi(discoSpi);
+
+ return cfg;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTestsStarted() throws Exception {
+ ignite = startGrids(GRID_CNT);
+ ignite.<Integer, String>getOrCreateCache(defaultCacheConfiguration());
+
+ try (ServerSocket sock = new ServerSocket(0)) {
+ port = sock.getLocalPort();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTestsStopped() throws Exception {
+ stopAllGrids();
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ ignite.cache(null).clear();
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedDefaultConverter() throws Exception {
+ test(null, null, new Runnable() {
+ @Override public void run() {
+ try (Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg.length >>> 24);
+ os.write(msg.length >>> 16);
+ os.write(msg.length >>> 8);
+ os.write(msg.length);
+
+ os.write(msg);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testSizeBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, null, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(0);
+ os.write(0);
+ os.write(0);
+ os.write(4);
+
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedDefaultConverter() throws Exception {
+ test(null, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+ Marshaller marsh = new JdkMarshaller();
+
+ for (int i = 0; i < CNT; i++) {
+ byte[] msg = marsh.marshal(new Tuple(i));
+
+ os.write(msg);
+ os.write(DELIM);
+ }
+ }
+ catch (IOException | IgniteCheckedException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+
+ }
+
+ /**
+ * @throws Exception If failed.
+ */
+ public void testDelimiterBasedCustomConverter() throws Exception {
+ SocketMessageConverter<Tuple> converter = new SocketMessageConverter<Tuple>() {
+ @Override public Tuple convert(byte[] msg) {
+ int i = (msg[0] & 0xFF) << 24;
+ i |= (msg[1] & 0xFF) << 16;
+ i |= (msg[2] & 0xFF) << 8;
+ i |= msg[3] & 0xFF;
+
+ return new Tuple(i);
+ }
+ };
+
+ test(converter, DELIM, new Runnable() {
+ @Override public void run() {
+ try(Socket sock = new Socket(InetAddress.getLocalHost(), port);
+ OutputStream os = new BufferedOutputStream(sock.getOutputStream())) {
+
+ for (int i = 0; i < CNT; i++) {
+ os.write(i >>> 24);
+ os.write(i >>> 16);
+ os.write(i >>> 8);
+ os.write(i);
+
+ os.write(DELIM);
+ }
+ }
+ catch (IOException e) {
+ throw new IgniteException(e);
+ }
+ }
+ });
+ }
+
+ /**
+ * @param converter Converter.
+ * @param r Runnable..
+ */
+ private void test(@Nullable SocketMessageConverter<Tuple> converter, @Nullable byte[] delim, Runnable r) throws Exception
+ {
+ SocketStreamer<Tuple, Integer, String> sockStmr = null;
+
+ try (IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer(null)) {
+
+ stmr.allowOverwrite(true);
+ stmr.autoFlushFrequency(10);
+
+ sockStmr = new SocketStreamer<>();
+
+ IgniteCache<Integer, String> cache = ignite.cache(null);
+
+ sockStmr.setIgnite(ignite);
+
+ sockStmr.setStreamer(stmr);
+
+ sockStmr.setPort(port);
+
+ sockStmr.setDelimiter(delim);
+
+ sockStmr.setTupleExtractor(new StreamTupleExtractor<Tuple, Integer, String>() {
+ @Override public Map.Entry<Integer, String> extract(Tuple msg) {
+ return new IgniteBiTuple<>(msg.key, msg.val);
+ }
+ });
+
+ if (converter != null)
+ sockStmr.setConverter(converter);
+
+ final CountDownLatch latch = new CountDownLatch(CNT);
+
+ IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
+ @Override public boolean apply(UUID uuid, CacheEvent evt) {
+ latch.countDown();
+
+ return true;
+ }
+ };
+
+ ignite.events(ignite.cluster().forCacheNodes(null)).remoteListen(locLsnr, null, EVT_CACHE_OBJECT_PUT);
+
+ sockStmr.start();
+
+ r.run();
+
+ latch.await();
+
+ assertEquals(CNT, cache.size(CachePeekMode.PRIMARY));
+
+ for (int i = 0; i < CNT; i++)
+ assertEquals(Integer.toString(i), cache.get(i));
+ }
+ finally {
+ if (sockStmr != null)
+ sockStmr.stop();
+ }
+
+ }
+
+ /**
+ * Tuple.
+ */
+ private static class Tuple implements Serializable {
+ /** Serial version uid. */
+ private static final long serialVersionUID = 0L;
+
+ /** Key. */
+ private final int key;
+
+ /** Value. */
+ private final String val;
+
+ /**
+ * @param key Key.
+ */
+ Tuple(int key) {
+ this.key = key;
+ this.val = Integer.toString(key);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/7ee85179/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
index 87bbfbb..61be976 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteStreamTestSuite.java
@@ -32,7 +32,7 @@ public class IgniteStreamTestSuite extends TestSuite {
public static TestSuite suite() throws Exception {
TestSuite suite = new TestSuite("Ignite Stream Test Suite");
- suite.addTest(new TestSuite(IgniteSocketStreamerSelfTest.class));
+ suite.addTest(new TestSuite(SocketStreamerSelfTest.class));
return suite;
}
[04/21] incubator-ignite git commit: Merge remote-tracking branch
'origin/ignite-sprint-4' into ignite-sprint-4
Posted by sb...@apache.org.
Merge remote-tracking branch 'origin/ignite-sprint-4' into ignite-sprint-4
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/9c30fba4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/9c30fba4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/9c30fba4
Branch: refs/heads/ignite-866
Commit: 9c30fba4d817c1ac8b16351b29163a5eaec6d7d5
Parents: a8a0062 478cc7b
Author: avinogradov <av...@gridgain.com>
Authored: Tue May 12 20:07:39 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Tue May 12 20:07:39 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 8 ++++----
2 files changed, 5 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
[07/21] incubator-ignite git commit: # ignite-669 - streaming design.
Posted by sb...@apache.org.
# ignite-669 - streaming design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/0cbe3c68
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/0cbe3c68
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/0cbe3c68
Branch: refs/heads/ignite-866
Commit: 0cbe3c68dcea09a39fd0e97909bbbaa6b1091a6b
Parents: 56e67e8
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 02:45:48 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:15 2015 +0300
----------------------------------------------------------------------
.../ignite/stream/adapters/StreamAdapter.java | 73 ++++++++++++++++++++
.../stream/adapters/StreamTupleExtractor.java | 33 +++++++++
.../ignite/stream/adapters/package-info.java | 21 ++++++
3 files changed, 127 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
new file mode 100644
index 0000000..02ae795
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.adapters;
+
+import org.apache.ignite.*;
+
+import java.util.*;
+
+/**
+ * Convenience adapter for streamers. Adapters are optional components for
+ * streaming from different data sources. The purpose of adapters is to
+ * convert different message formats into Ignite stream key-value tuples
+ * and feed the tuples into the provided {@link org.apache.ignite.IgniteDataStreamer}.
+ */
+public abstract class StreamAdapter<T, K, V> {
+ /** Tuple extractor. */
+ private final StreamTupleExtractor<T, K, V> extractor;
+
+ /** Streamer. */
+ private final IgniteDataStreamer<K, V> stmr;
+
+ /**
+ * Stream adapter.
+ *
+ * @param stmr Streamer.
+ * @param extractor Tuple extractor.
+ */
+ protected StreamAdapter(IgniteDataStreamer<K, V> stmr, StreamTupleExtractor<T, K, V> extractor) {
+ this.stmr = stmr;
+ this.extractor = extractor;
+ }
+
+ /**
+ * @return Provided data streamer.
+ */
+ public IgniteDataStreamer<K, V> streamer() {
+ return stmr;
+ }
+
+ /**
+ * @return Provided tuple extractor.
+ */
+ public StreamTupleExtractor<T, K, V> converter() {
+ return extractor;
+ }
+
+ /**
+ * Converts given message to a tuple and adds it to the underlying streamer.
+ *
+ * @param msg Message to convert.
+ */
+ protected void addMessage(T msg) {
+ Map.Entry<K, V> e = extractor.extract(msg);
+
+ if (e != null)
+ stmr.addData(e);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
new file mode 100644
index 0000000..9b0c395
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamTupleExtractor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.stream.adapters;
+
+import java.util.*;
+
+/**
+ * Stream tuple extractor to convert messages to Ignite key-value tuples.
+ */
+public interface StreamTupleExtractor<T, K, V> {
+ /**
+ * Extracts a key-value tuple from a message.
+ *
+ * @param msg Message.
+ * @return Key-value tuple.
+ */
+ public Map.Entry<K, V> extract(T msg);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/0cbe3c68/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
new file mode 100644
index 0000000..a69ffc0
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Contains Ignite stream adapters.
+ */
+package org.apache.ignite.stream.adapters;
[16/21] incubator-ignite git commit: Merge remote-tracking branch
'remotes/origin/ignite-sprint-4' into ignite-sprint-5
Posted by sb...@apache.org.
Merge remote-tracking branch 'remotes/origin/ignite-sprint-4' into ignite-sprint-5
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/04774b5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/04774b5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/04774b5f
Branch: refs/heads/ignite-866
Commit: 04774b5f2e2df2f83bef66d1a1c686662ebee04d
Parents: 896b426 9c30fba
Author: avinogradov <av...@gridgain.com>
Authored: Fri May 15 12:12:02 2015 +0300
Committer: avinogradov <av...@gridgain.com>
Committed: Fri May 15 12:12:02 2015 +0300
----------------------------------------------------------------------
.../ignite/spi/communication/tcp/TcpCommunicationSpi.java | 2 +-
.../ignite/spi/discovery/tcp/TcpDiscoverySpiAdapter.java | 8 ++++----
pom.xml | 2 ++
3 files changed, 7 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/04774b5f/pom.xml
----------------------------------------------------------------------
[08/21] incubator-ignite git commit: # ignite-669 - streaming design.
Posted by sb...@apache.org.
# ignite-669 - streaming design.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/be64e1dd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/be64e1dd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/be64e1dd
Branch: refs/heads/ignite-866
Commit: be64e1dd1f7aba02b664c4be6f2753cdafbbdba6
Parents: 0cbe3c6
Author: Dmitiry Setrakyan <ds...@gridgain.com>
Authored: Thu Apr 2 03:10:54 2015 -0700
Committer: agura <ag...@gridgain.com>
Committed: Fri May 15 03:44:18 2015 +0300
----------------------------------------------------------------------
.../ignite/stream/adapters/StreamAdapter.java | 29 +++++++++++++++++---
1 file changed, 25 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/be64e1dd/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
index 02ae795..f2e0da9 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/adapters/StreamAdapter.java
@@ -29,10 +29,17 @@ import java.util.*;
*/
public abstract class StreamAdapter<T, K, V> {
/** Tuple extractor. */
- private final StreamTupleExtractor<T, K, V> extractor;
+ private StreamTupleExtractor<T, K, V> extractor;
/** Streamer. */
- private final IgniteDataStreamer<K, V> stmr;
+ private IgniteDataStreamer<K, V> stmr;
+
+ /**
+ * Empty constructor.
+ */
+ public StreamAdapter() {
+ // No-op.
+ }
/**
* Stream adapter.
@@ -48,18 +55,32 @@ public abstract class StreamAdapter<T, K, V> {
/**
* @return Provided data streamer.
*/
- public IgniteDataStreamer<K, V> streamer() {
+ public IgniteDataStreamer<K, V> getStreamer() {
return stmr;
}
/**
+ * @param stmr Ignite data streamer.
+ */
+ public void setStreamer(IgniteDataStreamer<K, V> stmr) {
+ this.stmr = stmr;
+ }
+
+ /**
* @return Provided tuple extractor.
*/
- public StreamTupleExtractor<T, K, V> converter() {
+ public StreamTupleExtractor<T, K, V> getConverter() {
return extractor;
}
/**
+ * @param extractor Extractor for key-value tuples from messages.
+ */
+ public void setExtractor(StreamTupleExtractor<T, K, V> extractor) {
+ this.extractor = extractor;
+ }
+
+ /**
* Converts given message to a tuple and adds it to the underlying streamer.
*
* @param msg Message to convert.
[17/21] incubator-ignite git commit: # IGNITE-904: Implemented
interop start routine.
Posted by sb...@apache.org.
# IGNITE-904: Implemented interop start routine.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/15aa1cfa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/15aa1cfa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/15aa1cfa
Branch: refs/heads/ignite-866
Commit: 15aa1cfac4bd30c1d613c941d5afc994f34aa8e3
Parents: 04774b5f
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Fri May 15 17:28:34 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Fri May 15 17:28:34 2015 +0300
----------------------------------------------------------------------
.../org/apache/ignite/internal/IgnitionEx.java | 136 ++++++-------------
.../internal/interop/InteropBootstrap.java | 34 +++++
.../interop/InteropBootstrapFactory.java | 39 ++++++
.../internal/interop/InteropIgnition.java | 103 ++++++++++++++
.../internal/interop/InteropProcessor.java | 25 ++++
5 files changed, 240 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
index 8d88677..d54e06f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgnitionEx.java
@@ -532,22 +532,6 @@ public class IgnitionEx {
}
/**
- * Start Grid passing a closure which will modify configuration before it is passed to start routine.
- *
- * @param springCfgPath Spring config path.
- * @param gridName Grid name.
- * @param cfgClo Configuration closure.
- * @return Started Grid.
- * @throws IgniteCheckedException If failed.
- */
- public static Ignite startWithClosure(@Nullable String springCfgPath, @Nullable String gridName,
- IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo) throws IgniteCheckedException {
- URL url = U.resolveSpringUrl(springCfgPath);
-
- return start(url, gridName, null, cfgClo);
- }
-
- /**
* Loads all grid configurations specified within given Spring XML configuration file.
* <p>
* Usually Spring XML configuration file will contain only one Grid definition. Note that
@@ -734,7 +718,40 @@ public class IgnitionEx {
*/
public static Ignite start(URL springCfgUrl, @Nullable String gridName,
@Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
- return start(springCfgUrl, gridName, springCtx, null);
+ A.notNull(springCfgUrl, "springCfgUrl");
+
+ boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
+
+ IgniteBiTuple<Object, Object> t = null;
+
+ if (isLog4jUsed) {
+ try {
+ t = U.addLog4jNoOpLogger();
+ }
+ catch (IgniteCheckedException ignore) {
+ isLog4jUsed = false;
+ }
+ }
+
+ Collection<Handler> savedHnds = null;
+
+ if (!isLog4jUsed)
+ savedHnds = U.addJavaNoOpLogger();
+
+ IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap;
+
+ try {
+ cfgMap = loadConfigurations(springCfgUrl);
+ }
+ finally {
+ if (isLog4jUsed && t != null)
+ U.removeLog4jNoOpLogger(t);
+
+ if (!isLog4jUsed)
+ U.removeJavaNoOpLogger(savedHnds);
+ }
+
+ return startConfigurations(cfgMap, springCfgUrl, gridName, springCtx);
}
/**
@@ -780,73 +797,6 @@ public class IgnitionEx {
*/
public static Ignite start(InputStream springCfgStream, @Nullable String gridName,
@Nullable GridSpringResourceContext springCtx) throws IgniteCheckedException {
- return start(springCfgStream, gridName, springCtx, null);
- }
-
- /**
- * Internal Spring-based start routine.
- *
- * @param springCfgUrl Spring XML configuration file URL. This cannot be {@code null}.
- * @param gridName Grid name that will override default.
- * @param springCtx Optional Spring application context.
- * @param cfgClo Optional closure to change configuration before it is used to start the grid.
- * @return Started grid.
- * @throws IgniteCheckedException If failed.
- */
- private static Ignite start(final URL springCfgUrl, @Nullable String gridName,
- @Nullable GridSpringResourceContext springCtx,
- @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
- throws IgniteCheckedException {
- A.notNull(springCfgUrl, "springCfgUrl");
-
- boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
-
- IgniteBiTuple<Object, Object> t = null;
-
- if (isLog4jUsed) {
- try {
- t = U.addLog4jNoOpLogger();
- }
- catch (IgniteCheckedException ignore) {
- isLog4jUsed = false;
- }
- }
-
- Collection<Handler> savedHnds = null;
-
- if (!isLog4jUsed)
- savedHnds = U.addJavaNoOpLogger();
-
- IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap;
-
- try {
- cfgMap = loadConfigurations(springCfgUrl);
- }
- finally {
- if (isLog4jUsed && t != null)
- U.removeLog4jNoOpLogger(t);
-
- if (!isLog4jUsed)
- U.removeJavaNoOpLogger(savedHnds);
- }
-
- return startConfigurations(cfgMap, springCfgUrl, gridName, springCtx, cfgClo);
- }
-
- /**
- * Internal Spring-based start routine.
- *
- * @param springCfgStream Input stream containing Spring XML configuration. This cannot be {@code null}.
- * @param gridName Grid name that will override default.
- * @param springCtx Optional Spring application context.
- * @param cfgClo Optional closure to change configuration before it is used to start the grid.
- * @return Started grid.
- * @throws IgniteCheckedException If failed.
- */
- private static Ignite start(final InputStream springCfgStream, @Nullable String gridName,
- @Nullable GridSpringResourceContext springCtx,
- @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
- throws IgniteCheckedException {
A.notNull(springCfgStream, "springCfgUrl");
boolean isLog4jUsed = U.gridClassLoader().getResource("org/apache/log4j/Appender.class") != null;
@@ -880,7 +830,7 @@ public class IgnitionEx {
U.removeJavaNoOpLogger(savedHnds);
}
- return startConfigurations(cfgMap, null, gridName, springCtx, cfgClo);
+ return startConfigurations(cfgMap, null, gridName, springCtx);
}
/**
@@ -890,7 +840,6 @@ public class IgnitionEx {
* @param springCfgUrl Spring XML configuration file URL.
* @param gridName Grid name that will override default.
* @param springCtx Optional Spring application context.
- * @param cfgClo Optional closure to change configuration before it is used to start the grid.
* @return Started grid.
* @throws IgniteCheckedException If failed.
*/
@@ -898,8 +847,7 @@ public class IgnitionEx {
IgniteBiTuple<Collection<IgniteConfiguration>, ? extends GridSpringResourceContext> cfgMap,
URL springCfgUrl,
@Nullable String gridName,
- @Nullable GridSpringResourceContext springCtx,
- @Nullable IgniteClosure<IgniteConfiguration, IgniteConfiguration> cfgClo)
+ @Nullable GridSpringResourceContext springCtx)
throws IgniteCheckedException {
List<IgniteNamedInstance> grids = new ArrayList<>(cfgMap.size());
@@ -910,12 +858,6 @@ public class IgnitionEx {
if (cfg.getGridName() == null && !F.isEmpty(gridName))
cfg.setGridName(gridName);
- if (cfgClo != null) {
- cfg = cfgClo.apply(cfg);
-
- assert cfg != null;
- }
-
// Use either user defined context or our one.
IgniteNamedInstance grid = start0(
new GridStartContext(cfg, springCfgUrl, springCtx == null ? cfgMap.get2() : springCtx));
@@ -1600,9 +1542,9 @@ public class IgnitionEx {
igfsExecSvc, restExecSvc,
new CA() {
@Override public void apply() {
- startLatch.countDown();
- }
- });
+ startLatch.countDown();
+ }
+ });
state = STARTED;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
new file mode 100644
index 0000000..820bef9
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrap.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import org.apache.ignite.configuration.*;
+
+/**
+ * Interop bootstrap. Responsible for starting Ignite node in interop mode.
+ */
+public interface InteropBootstrap {
+ /**
+ * Start Ignite node.
+ *
+ * @param cfg Configuration.
+ * @param envPtr Environment pointer.
+ * @return Ignite node.
+ */
+ public InteropProcessor start(IgniteConfiguration cfg, long envPtr);
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
new file mode 100644
index 0000000..b61ca89
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropBootstrapFactory.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import java.io.*;
+
+/**
+ * Interop bootstrap factory.
+ */
+public interface InteropBootstrapFactory extends Serializable {
+ /**
+ * Get bootstrap factory ID.
+ *
+ * @return ID.
+ */
+ public int id();
+
+ /**
+ * Create bootstrap instance.
+ *
+ * @return Bootstrap instance.
+ */
+ public InteropBootstrap create();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
new file mode 100644
index 0000000..f245122
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropIgnition.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+import org.apache.ignite.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.resource.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.jetbrains.annotations.*;
+
+import java.net.*;
+import java.security.*;
+import java.util.*;
+
+/**
+ * Entry point for interop nodes.
+ */
+@SuppressWarnings("UnusedDeclaration")
+public class InteropIgnition {
+ /**
+ * Start Ignite node in interop mode.
+ *
+ * @param springCfgPath Spring configuration path.
+ * @param gridName Grid name.
+ * @param factoryId Factory ID.
+ * @param envPtr Environment pointer.
+ * @return Ignite instance.
+ */
+ public static InteropProcessor start(@Nullable String springCfgPath, @Nullable String gridName, int factoryId,
+ long envPtr) {
+ IgniteConfiguration cfg = configuration(springCfgPath);
+
+ if (gridName != null)
+ cfg.setGridName(gridName);
+
+ InteropBootstrap bootstrap = bootstrap(factoryId);
+
+ return bootstrap.start(cfg, envPtr);
+ }
+
+ private static IgniteConfiguration configuration(@Nullable String springCfgPath) {
+ try {
+ URL url = springCfgPath == null ? U.resolveIgniteUrl(IgnitionEx.DFLT_CFG) :
+ U.resolveSpringUrl(springCfgPath);
+
+ IgniteBiTuple<IgniteConfiguration, GridSpringResourceContext> t = IgnitionEx.loadConfiguration(url);
+
+ return t.get1();
+ }
+ catch (IgniteCheckedException e) {
+ throw new IgniteException("Failed to instantiate configuration from Spring XML: " + springCfgPath, e);
+ }
+ }
+
+ /**
+ * Create bootstrap for the given factory ID.
+ *
+ * @param factoryId Factory ID.
+ * @return Bootstrap.
+ */
+ private static InteropBootstrap bootstrap(final int factoryId) {
+ InteropBootstrapFactory factory = AccessController.doPrivileged(
+ new PrivilegedAction<InteropBootstrapFactory>() {
+ @Override public InteropBootstrapFactory run() {
+ for (InteropBootstrapFactory factory : ServiceLoader.load(InteropBootstrapFactory.class)) {
+ if (factory.id() == factoryId)
+ return factory;
+ }
+
+ return null;
+ }
+ });
+
+ if (factory == null)
+ throw new IgniteException("Interop factory is not found (did you put into the classpath?): " + factoryId);
+
+ return factory.create();
+ }
+
+ /**
+ * Private constructor.
+ */
+ private InteropIgnition() {
+ // No-op.
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/15aa1cfa/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
new file mode 100644
index 0000000..6c55296
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/interop/InteropProcessor.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.interop;
+
+/**
+ * Interop processor.
+ */
+public interface InteropProcessor {
+ // No-op.
+}