You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:11 UTC
[26/50] [abbrv] ignite git commit: WIP
WIP
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db8a8b69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db8a8b69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db8a8b69
Branch: refs/heads/ignite-2649
Commit: db8a8b694675ff654fed6c6f11b20b3f4120a995
Parents: 34a3ada
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 12:09:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 12:09:53 2016 +0300
----------------------------------------------------------------------
.../socket/WordsSocketStreamerServer.java | 2 +-
.../stream/socket/SocketMessageConverter.java | 3 +--
.../ignite/stream/socket/SocketStreamer.java | 20 +++++++++++++++-----
3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index b104b34..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -99,7 +99,7 @@ public class WordsSocketStreamerServer {
// Converter from zero-terminated string to Java strings.
sockStmr.setConverter(new SocketMessageConverter<String>() {
- @Override public String convert(byte[] msg, String gridName) {
+ @Override public String convert(byte[] msg) {
try {
return new String(msg, "ASCII");
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
index 9e2ca67..03e82e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -25,8 +25,7 @@ public interface SocketMessageConverter<T> {
* Converter message represented by array of bytes to object.
*
* @param msg Message.
- * @param gridName Grid name.
* @return Converted object.
*/
- public T convert(byte[] msg, String gridName);
+ public T convert(byte[] msg);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 818ae6a..1e79e1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
import org.apache.ignite.internal.util.nio.GridNioSession;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.marshaller.MarshallerUtils;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
import org.apache.ignite.stream.StreamAdapter;
@@ -164,7 +165,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
}
@Override public void onMessage(GridNioSession ses, byte[] msg) {
- addMessage(converter.convert(msg, ses.gridName()));
+ addMessage(converter.convert(msg));
}
};
@@ -174,7 +175,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
new GridDelimitedParser(delim, directMode);
if (converter == null)
- converter = new DefaultConverter<>();
+ converter = new DefaultConverter<>(getIgnite().name());
GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
@@ -216,12 +217,21 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
*/
private static class DefaultConverter<T> implements SocketMessageConverter<T> {
/** Marshaller. */
- private static final JdkMarshaller MARSH = new JdkMarshaller();
+ private final Marshaller marsh;
+
+ /**
+ * Constructor.
+ *
+ * @param gridName Grid name.
+ */
+ private DefaultConverter(@Nullable String gridName) {
+ marsh = MarshallerUtils.withNodeName(new JdkMarshaller(), gridName);
+ }
/** {@inheritDoc} */
- @Override public T convert(byte[] msg, final String gridName) {
+ @Override public T convert(byte[] msg) {
try {
- return MarshallerUtils.unmarshal(gridName, MARSH, msg, null);
+ return MarshallerUtils.unmarshal(gridName, marsh, msg, null);
}
catch (IgniteCheckedException e) {
throw new IgniteException(e);