You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by yz...@apache.org on 2015/11/21 15:53:44 UTC

[5/9] ignite git commit: Direct marshalling optimizations

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
index 987090d..aa88808 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridDirectParser.java
@@ -23,7 +23,6 @@ import java.nio.ByteBuffer;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
 import org.jetbrains.annotations.Nullable;
 
@@ -41,46 +40,48 @@ public class GridDirectParser implements GridNioParser {
     private final MessageFactory msgFactory;
 
     /** */
-    private final MessageFormatter formatter;
+    private final GridNioMessageReaderFactory readerFactory;
 
     /**
      * @param msgFactory Message factory.
-     * @param formatter Formatter.
+     * @param readerFactory Message reader factory.
      */
-    public GridDirectParser(MessageFactory msgFactory, MessageFormatter formatter) {
+    public GridDirectParser(MessageFactory msgFactory, GridNioMessageReaderFactory readerFactory) {
         assert msgFactory != null;
-        assert formatter != null;
+        assert readerFactory != null;
 
         this.msgFactory = msgFactory;
-        this.formatter = formatter;
+        this.readerFactory = readerFactory;
     }
 
     /** {@inheritDoc} */
     @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
         throws IOException, IgniteCheckedException {
-        Message msg = ses.removeMeta(MSG_META_KEY);
+        MessageReader reader = ses.meta(READER_META_KEY);
 
-        MessageReader reader = null;
+        if (reader == null)
+            ses.addMeta(READER_META_KEY, reader = readerFactory.reader(ses, msgFactory));
 
-        if (msg == null && buf.hasRemaining()) {
-            msg = msgFactory.create(buf.get());
+        Message msg = ses.removeMeta(MSG_META_KEY);
 
-            ses.addMeta(READER_META_KEY, reader = formatter.reader(msgFactory, msg.getClass()));
-        }
+        if (msg == null && buf.hasRemaining())
+            msg = msgFactory.create(buf.get());
 
         boolean finished = false;
 
         if (buf.hasRemaining()) {
-            if (reader == null)
-                reader = ses.meta(READER_META_KEY);
-
-            assert reader != null;
+            if (reader != null)
+                reader.setCurrentReadClass(msg.getClass());
 
             finished = msg.readFrom(buf, reader);
         }
 
-        if (finished)
+        if (finished) {
+            if (reader != null)
+                reader.reset();
+
             return msg;
+        }
         else {
             ses.addMeta(MSG_META_KEY, msg);
 
@@ -93,4 +94,4 @@ public class GridDirectParser implements GridNioParser {
         // No encoding needed for direct messages.
         throw new UnsupportedEncodingException();
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
new file mode 100644
index 0000000..cb7e62d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageReaderFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+
+/**
+ * Message reader factory.
+ */
+public interface GridNioMessageReaderFactory {
+    /**
+     * Creates new reader.
+     *
+     * @param ses Current session.
+     * @param msgFactory Message factory.
+     * @return Reader.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public MessageReader reader(GridNioSession ses, MessageFactory msgFactory) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
new file mode 100644
index 0000000..6c4dee4
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioMessageWriterFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Message writer factory.
+ */
+public interface GridNioMessageWriterFactory {
+    /**
+     * Creates new writer.
+     *
+     * @param ses Current session.
+     * @return Writer.
+     * @throws IgniteCheckedException In case of error.
+     */
+    public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
index 5bd08e7..384ee1a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridNioServer.java
@@ -62,7 +62,6 @@ import org.apache.ignite.lang.IgniteBiInClosure;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.lang.IgnitePredicate;
 import org.apache.ignite.plugin.extensions.communication.Message;
-import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 import org.apache.ignite.thread.IgniteThread;
 import org.jetbrains.annotations.Nullable;
@@ -176,7 +175,7 @@ public class GridNioServer<T> {
 
     /** */
     @GridToStringExclude
-    private MessageFormatter formatter;
+    private GridNioMessageWriterFactory writerFactory;
 
     /** */
     @GridToStringExclude
@@ -212,7 +211,7 @@ public class GridNioServer<T> {
      * @param directMode Whether direct mode is used.
      * @param daemon Daemon flag to create threads.
      * @param metricsLsnr Metrics listener.
-     * @param formatter Message formatter.
+     * @param writerFactory Writer factory.
      * @param skipRecoveryPred Skip recovery predicate.
      * @param msgQueueLsnr Message queue size listener.
      * @param filters Filters for this server.
@@ -234,7 +233,7 @@ public class GridNioServer<T> {
         boolean directMode,
         boolean daemon,
         GridNioMetricsListener metricsLsnr,
-        MessageFormatter formatter,
+        GridNioMessageWriterFactory writerFactory,
         IgnitePredicate<Message> skipRecoveryPred,
         IgniteBiInClosure<GridNioSession, Integer> msgQueueLsnr,
         GridNioFilter... filters
@@ -304,7 +303,7 @@ public class GridNioServer<T> {
 
         this.directMode = directMode;
         this.metricsLsnr = metricsLsnr;
-        this.formatter = formatter;
+        this.writerFactory = writerFactory;
 
         this.skipRecoveryPred = skipRecoveryPred != null ? skipRecoveryPred : F.<Message>alwaysFalse();
     }
@@ -934,8 +933,14 @@ public class GridNioServer<T> {
 
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
-            if (writer == null)
-                ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+            if (writer == null) {
+                try {
+                    ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to create message writer.", e);
+                }
+            }
 
             boolean handshakeFinished = sslFilter.lock(ses);
 
@@ -1129,8 +1134,14 @@ public class GridNioServer<T> {
 
             MessageWriter writer = ses.meta(MSG_WRITER.ordinal());
 
-            if (writer == null)
-                ses.addMeta(MSG_WRITER.ordinal(), writer = formatter.writer());
+            if (writer == null) {
+                try {
+                    ses.addMeta(MSG_WRITER.ordinal(), writer = writerFactory.writer(ses));
+                }
+                catch (IgniteCheckedException e) {
+                    throw new IOException("Failed to create message writer.", e);
+                }
+            }
 
             if (req == null) {
                 req = (NioOperationFuture<?>)ses.pollFuture();
@@ -1152,7 +1163,7 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished)
+                if (finished && writer != null)
                     writer.reset();
             }
 
@@ -1176,7 +1187,7 @@ public class GridNioServer<T> {
 
                 finished = msg.writeTo(buf, writer);
 
-                if (finished)
+                if (finished && writer != null)
                     writer.reset();
             }
 
@@ -2221,8 +2232,8 @@ public class GridNioServer<T> {
         /** Daemon flag. */
         private boolean daemon;
 
-        /** Message formatter. */
-        private MessageFormatter formatter;
+        /** Writer factory. */
+        private GridNioMessageWriterFactory writerFactory;
 
         /** Skip recovery predicate. */
         private IgnitePredicate<Message> skipRecoveryPred;
@@ -2253,7 +2264,7 @@ public class GridNioServer<T> {
                 directMode,
                 daemon,
                 metricsLsnr,
-                formatter,
+                writerFactory,
                 skipRecoveryPred,
                 msgQueueLsnr,
                 filters != null ? Arrays.copyOf(filters, filters.length) : EMPTY_FILTERS
@@ -2450,11 +2461,11 @@ public class GridNioServer<T> {
         }
 
         /**
-         * @param formatter Message formatter.
+         * @param writerFactory Writer factory.
          * @return This for chaining.
          */
-        public Builder<T> messageFormatter(MessageFormatter formatter) {
-            this.formatter = formatter;
+        public Builder<T> writerFactory(GridNioMessageWriterFactory writerFactory) {
+            this.writerFactory = writerFactory;
 
             return this;
         }
@@ -2479,4 +2490,4 @@ public class GridNioServer<T> {
             return this;
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index 93e789d..ebe86fb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -33,7 +33,6 @@ import org.apache.ignite.internal.util.typedef.internal.U;
 import org.apache.ignite.lang.IgniteInClosure;
 import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
-import org.jetbrains.annotations.Nullable;
 
 /**
  *
@@ -120,16 +119,17 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     }
 
     /** {@inheritDoc} */
-    @Override public synchronized boolean sendMessage(@Nullable UUID nodeId, Message msg,
-        IgniteInClosure<IgniteException> closure)
-        throws IgniteCheckedException {
+    @Override public synchronized boolean sendMessage(UUID nodeId, Message msg,
+        IgniteInClosure<IgniteException> closure) throws IgniteCheckedException {
+        assert nodeId != null;
+
         if (closed())
             throw new IgniteCheckedException("Communication client was closed: " + this);
 
         assert writeBuf.hasArray();
 
         try {
-            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer());
+            int cnt = U.writeMessageFully(msg, shmem.outputStream(), writeBuf, formatter.writer(nodeId));
 
             metricsLsnr.onBytesSent(cnt);
         }
@@ -154,4 +154,4 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     @Override public String toString() {
         return S.toString(GridShmemCommunicationClient.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
index 4c9e0ee..7bc2a53 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageFormatter.java
@@ -17,6 +17,8 @@
 
 package org.apache.ignite.plugin.extensions.communication;
 
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.plugin.Extension;
 
 /**
@@ -33,16 +35,19 @@ public interface MessageFormatter extends Extension {
     /**
      * Creates new message writer instance.
      *
+     * @param rmtNodeId Remote node ID.
      * @return Message writer.
+     * @throws IgniteCheckedException In case of error.
      */
-    public MessageWriter writer();
+    public MessageWriter writer(UUID rmtNodeId) throws IgniteCheckedException;
 
     /**
      * Creates new message reader instance.
      *
-     * @param factory Message factory.
-     * @param msgCls Message class to read.
+     * @param rmtNodeId Remote node ID.
+     * @param msgFactory Message factory.
      * @return Message reader.
+     * @throws IgniteCheckedException In case of error.
      */
-    public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls);
-}
\ No newline at end of file
+    public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) throws IgniteCheckedException;
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
index d40a384..502c69f 100644
--- a/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/plugin/extensions/communication/MessageReader.java
@@ -39,6 +39,13 @@ public interface MessageReader {
     public void setBuffer(ByteBuffer buf);
 
     /**
+     * Sets type of message currently read.
+     *
+     * @param msgCls Message type.
+     */
+    public void setCurrentReadClass(Class<? extends Message> msgCls);
+
+    /**
      * Callback that must be invoked by a message implementation before message body started decoding.
      *
      * @return {@code True} if reading can proceed, {@code false} otherwise.
@@ -272,4 +279,21 @@ public interface MessageReader {
      * Increments read state.
      */
     public void incrementState();
-}
\ No newline at end of file
+
+    /**
+     * Callback called before inner message is read.
+     */
+    public void beforeInnerMessageRead();
+
+    /**
+     * Callback called after inner message is read.
+     *
+     * @param finished Whether message was fully read.
+     */
+    public void afterInnerMessageRead(boolean finished);
+
+    /**
+     * Resets this reader.
+     */
+    public void reset();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
index cd46de4..1cb202c 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/IgniteSpiAdapter.java
@@ -696,11 +696,11 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
 
             if (msgFormatter0 == null) {
                 msgFormatter0 = new MessageFormatter() {
-                    @Override public MessageWriter writer() {
+                    @Override public MessageWriter writer(UUID rmtNodeId) {
                         throw new IgniteException("Failed to write message, node is not started.");
                     }
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
+                    @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) {
                         throw new IgniteException("Failed to read message, node is not started.");
                     }
                 };
@@ -880,4 +880,4 @@ public abstract class IgniteSpiAdapter implements IgniteSpi, IgniteSpiManagement
             ((IgniteKernal)ignite0).context().timeout().removeTimeoutObject(new GridSpiTimeoutObject(obj));
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
index e8bd8a1..68e2f43 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
@@ -76,7 +76,9 @@ import org.apache.ignite.internal.util.nio.GridConnectionBytesVerifyFilter;
 import org.apache.ignite.internal.util.nio.GridDirectParser;
 import org.apache.ignite.internal.util.nio.GridNioCodecFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
+import org.apache.ignite.internal.util.nio.GridNioMessageReaderFactory;
 import org.apache.ignite.internal.util.nio.GridNioMessageTracker;
+import org.apache.ignite.internal.util.nio.GridNioMessageWriterFactory;
 import org.apache.ignite.internal.util.nio.GridNioMetricsListener;
 import org.apache.ignite.internal.util.nio.GridNioRecoveryDescriptor;
 import org.apache.ignite.internal.util.nio.GridNioServer;
@@ -1575,29 +1577,38 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter msgFormatter = new MessageFormatter() {
-                    private MessageFormatter impl;
+                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
+                    private MessageFormatter formatter;
 
-                    @Override public MessageWriter writer() {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                    @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
+                        throws IgniteCheckedException {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
 
-                        assert impl != null;
+                        assert formatter != null;
 
-                        return impl.writer();
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
                     }
+                };
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
+                    private MessageFormatter formatter;
 
-                        assert impl != null;
+                    @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
+
+                        assert formatter != null;
+
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return impl.reader(factory, msgCls);
+                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
                     }
                 };
 
-                GridDirectParser parser = new GridDirectParser(msgFactory, msgFormatter);
+                GridDirectParser parser = new GridDirectParser(msgFactory, readerFactory);
 
                 IgnitePredicate<Message> skipRecoveryPred = new IgnitePredicate<Message>() {
                     @Override public boolean apply(Message msg) {
@@ -1658,7 +1669,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                         .metricsListener(metricsLsnr)
                         .writeTimeout(sockWriteTimeout)
                         .filters(filters)
-                        .messageFormatter(msgFormatter)
+                        .writerFactory(writerFactory)
                         .skipRecoveryPredicate(skipRecoveryPred)
                         .messageQueueSizeListener(queueSizeMonitor)
                         .build();
@@ -1918,7 +1929,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                     UUID nodeId = null;
 
-                    if (!client.async() && !locNode.version().equals(node.version()))
+                    if (!client.async())
                         nodeId = node.id();
 
                     retry = client.sendMessage(nodeId, msg, ackC);
@@ -2591,7 +2602,7 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
 
                         buf.order(ByteOrder.nativeOrder());
 
-                        boolean written = msg.writeTo(buf, getSpiContext().messageFormatter().writer());
+                        boolean written = msg.writeTo(buf, null);
 
                         assert written;
 
@@ -2932,25 +2943,34 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     }
                 };
 
-                MessageFormatter msgFormatter = new MessageFormatter() {
-                    private MessageFormatter impl;
+                GridNioMessageWriterFactory writerFactory = new GridNioMessageWriterFactory() {
+                    private MessageFormatter formatter;
 
-                    @Override public MessageWriter writer() {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                    @Override public MessageWriter writer(GridNioSession ses) throws IgniteCheckedException {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
 
-                        assert impl != null;
+                        assert formatter != null;
 
-                        return impl.writer();
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
+
+                        return rmtNodeId != null ? formatter.writer(rmtNodeId) : null;
                     }
+                };
 
-                    @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
-                        if (impl == null)
-                            impl = getSpiContext().messageFormatter();
+                GridNioMessageReaderFactory readerFactory = new GridNioMessageReaderFactory() {
+                    private MessageFormatter formatter;
 
-                        assert impl != null;
+                    @Override public MessageReader reader(GridNioSession ses, MessageFactory msgFactory)
+                        throws IgniteCheckedException {
+                        if (formatter == null)
+                            formatter = getSpiContext().messageFormatter();
+
+                        assert formatter != null;
+
+                        UUID rmtNodeId = ses.meta(NODE_ID_META);
 
-                        return impl.reader(factory, msgCls);
+                        return rmtNodeId != null ? formatter.reader(rmtNodeId, msgFactory) : null;
                     }
                 };
 
@@ -2959,8 +2979,8 @@ public class TcpCommunicationSpi extends IgniteSpiAdapter
                     log,
                     endpoint,
                     srvLsnr,
-                    msgFormatter,
-                    new GridNioCodecFilter(new GridDirectParser(msgFactory, msgFormatter), log, true),
+                    writerFactory,
+                    new GridNioCodecFilter(new GridDirectParser(msgFactory, readerFactory), log, true),
                     new GridConnectionBytesVerifyFilter(log)
                 );
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
index f0f41bf..176973e 100644
--- a/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
+++ b/modules/core/src/main/java/org/apache/ignite/stream/StreamTransformer.java
@@ -29,6 +29,11 @@ import org.apache.ignite.cache.CacheEntryProcessor;
 /**
  * Convenience adapter to transform update existing values in streaming cache
  * based on the previously cached value.
+ * <p>
+ * This transformer implement {@link EntryProcessor} and internally will call
+ * {@link IgniteCache#invoke(Object, EntryProcessor, Object...)} method. Note
+ * that the value received from the data streamer will be passed to the entry
+ * processor as an argument.
  */
 public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, EntryProcessor<K, V, Object> {
     /** */
@@ -37,7 +42,7 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E
     /** {@inheritDoc} */
     @Override public void receive(IgniteCache<K, V> cache, Collection<Map.Entry<K, V>> entries) throws IgniteException {
         for (Map.Entry<K, V> entry : entries)
-            cache.invoke(entry.getKey(), this);
+            cache.invoke(entry.getKey(), this, entry.getValue());
     }
 
     /**
@@ -53,4 +58,4 @@ public abstract class StreamTransformer<K, V> implements StreamReceiver<K, V>, E
             }
         };
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/2de3b19b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
index 906a050..e257a97 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridSpiTestContext.java
@@ -37,12 +37,12 @@ import org.apache.ignite.events.TaskEvent;
 import org.apache.ignite.internal.ClusterMetricsSnapshot;
 import org.apache.ignite.internal.direct.DirectMessageReader;
 import org.apache.ignite.internal.direct.DirectMessageWriter;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
 import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
 import org.apache.ignite.internal.util.typedef.F;
 import org.apache.ignite.lang.IgniteUuid;
-import org.apache.ignite.plugin.extensions.communication.Message;
 import org.apache.ignite.plugin.extensions.communication.MessageFactory;
 import org.apache.ignite.plugin.extensions.communication.MessageFormatter;
 import org.apache.ignite.plugin.extensions.communication.MessageReader;
@@ -492,12 +492,12 @@ public class GridSpiTestContext implements IgniteSpiContext {
     @Override public MessageFormatter messageFormatter() {
         if (formatter == null) {
             formatter = new MessageFormatter() {
-                @Override public MessageWriter writer() {
-                    return new DirectMessageWriter();
+                @Override public MessageWriter writer(UUID rmtNodeId) {
+                    return new DirectMessageWriter(GridIoManager.DIRECT_PROTO_VER);
                 }
 
-                @Override public MessageReader reader(MessageFactory factory, Class<? extends Message> msgCls) {
-                    return new DirectMessageReader(factory, this);
+                @Override public MessageReader reader(UUID rmtNodeId, MessageFactory msgFactory) {
+                    return new DirectMessageReader(msgFactory, GridIoManager.DIRECT_PROTO_VER);
                 }
             };
         }
@@ -573,4 +573,4 @@ public class GridSpiTestContext implements IgniteSpiContext {
             this.obj = obj;
         }
     }
-}
\ No newline at end of file
+}