You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/07 11:17:11 UTC

[26/50] [abbrv] ignite git commit: WIP

WIP


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/db8a8b69
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/db8a8b69
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/db8a8b69

Branch: refs/heads/ignite-2649
Commit: db8a8b694675ff654fed6c6f11b20b3f4120a995
Parents: 34a3ada
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Sep 7 12:09:53 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Sep 7 12:09:53 2016 +0300

----------------------------------------------------------------------
 .../socket/WordsSocketStreamerServer.java       |  2 +-
 .../stream/socket/SocketMessageConverter.java   |  3 +--
 .../ignite/stream/socket/SocketStreamer.java    | 20 +++++++++++++++-----
 3 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
index b104b34..814d235 100644
--- a/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
+++ b/examples/src/main/java/org/apache/ignite/examples/streaming/wordcount/socket/WordsSocketStreamerServer.java
@@ -99,7 +99,7 @@ public class WordsSocketStreamerServer {
 
         // Converter from zero-terminated string to Java strings.
         sockStmr.setConverter(new SocketMessageConverter<String>() {
-            @Override public String convert(byte[] msg, String gridName) {
+            @Override public String convert(byte[] msg) {
                 try {
                     return new String(msg, "ASCII");
                 }

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
index 9e2ca67..03e82e3 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketMessageConverter.java
@@ -25,8 +25,7 @@ public interface SocketMessageConverter<T> {
      * Converter message represented by array of bytes to object.
      *
      * @param msg Message.
-     * @param gridName Grid name.
      * @return Converted object.
      */
-    public T convert(byte[] msg, String gridName);
+    public T convert(byte[] msg);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/ignite/blob/db8a8b69/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
index 818ae6a..1e79e1c 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/socket/SocketStreamer.java
@@ -35,6 +35,7 @@ import org.apache.ignite.internal.util.nio.GridNioServerListenerAdapter;
 import org.apache.ignite.internal.util.nio.GridNioSession;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.internal.util.typedef.internal.A;
+import org.apache.ignite.marshaller.Marshaller;
 import org.apache.ignite.marshaller.MarshallerUtils;
 import org.apache.ignite.marshaller.jdk.JdkMarshaller;
 import org.apache.ignite.stream.StreamAdapter;
@@ -164,7 +165,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
             }
 
             @Override public void onMessage(GridNioSession ses, byte[] msg) {
-                addMessage(converter.convert(msg, ses.gridName()));
+                addMessage(converter.convert(msg));
             }
         };
 
@@ -174,7 +175,7 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
             new GridDelimitedParser(delim, directMode);
 
         if (converter == null)
-            converter = new DefaultConverter<>();
+            converter = new DefaultConverter<>(getIgnite().name());
 
         GridNioFilter codec = new GridNioCodecFilter(parser, log, directMode);
 
@@ -216,12 +217,21 @@ public class SocketStreamer<T, K, V> extends StreamAdapter<T, K, V> {
      */
     private static class DefaultConverter<T> implements SocketMessageConverter<T> {
         /** Marshaller. */
-        private static final JdkMarshaller MARSH = new JdkMarshaller();
+        private final Marshaller marsh;
+
+        /**
+         * Constructor.
+         *
+         * @param gridName Grid name.
+         */
+        private DefaultConverter(@Nullable String gridName) {
+            marsh = MarshallerUtils.withNodeName(new JdkMarshaller(), gridName);
+        }
 
         /** {@inheritDoc} */
-        @Override public T convert(byte[] msg, final String gridName) {
+        @Override public T convert(byte[] msg) {
             try {
-                return MarshallerUtils.unmarshal(gridName, MARSH, msg, null);
+                return MarshallerUtils.unmarshal(gridName, marsh, msg, null);
             }
             catch (IgniteCheckedException e) {
                 throw new IgniteException(e);