You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2016/12/09 11:27:40 UTC

[18/25] ignite git commit: IGNITE-3220 I/O bottleneck on server/client cluster configuration Communications optimizations: - possibility to open separate in/out connections - possibility to have multiple connections between nodes - implemented NIO sessio

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
index 63c9845..66f9176 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridSelectorNioSessionImpl.java
@@ -20,9 +20,11 @@ package org.apache.ignite.internal.util.nio;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.IgniteLogger;
 import org.apache.ignite.internal.util.tostring.GridToStringExclude;
 import org.apache.ignite.internal.util.typedef.internal.LT;
@@ -37,17 +39,14 @@ import org.jsr166.ConcurrentLinkedDeque8;
  */
 class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Pending write requests. */
-    private final ConcurrentLinkedDeque8<GridNioFuture<?>> queue = new ConcurrentLinkedDeque8<>();
+    private final ConcurrentLinkedDeque8<SessionWriteRequest> queue = new ConcurrentLinkedDeque8<>();
 
     /** Selection key associated with this session. */
     @GridToStringExclude
     private SelectionKey key;
 
-    /** Worker index for server */
-    private final int selectorIdx;
-
-    /** Size counter. */
-    private final AtomicInteger queueSize = new AtomicInteger();
+    /** Current worker thread. */
+    private volatile GridNioWorker worker;
 
     /** Semaphore. */
     @GridToStringExclude
@@ -59,17 +58,29 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     /** Read buffer. */
     private ByteBuffer readBuf;
 
-    /** Recovery data. */
-    private GridNioRecoveryDescriptor recovery;
+    /** Incoming recovery data. */
+    private GridNioRecoveryDescriptor inRecovery;
+
+    /** Outgoing recovery data. */
+    private GridNioRecoveryDescriptor outRecovery;
 
     /** Logger. */
     private final IgniteLogger log;
 
+    /** */
+    private List<GridNioServer.SessionChangeRequest> pendingStateChanges;
+
+    /** */
+    final AtomicBoolean procWrite = new AtomicBoolean();
+
+    /** */
+    private Object sysMsg;
+
     /**
      * Creates session instance.
      *
      * @param log Logger.
-     * @param selectorIdx Selector index for this session.
+     * @param worker NIO worker thread.
      * @param filterChain Filter chain that will handle requests.
      * @param locAddr Local address.
      * @param rmtAddr Remote address.
@@ -80,7 +91,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      */
     GridSelectorNioSessionImpl(
         IgniteLogger log,
-        int selectorIdx,
+        GridNioWorker worker,
         GridNioFilterChain filterChain,
         InetSocketAddress locAddr,
         InetSocketAddress rmtAddr,
@@ -91,7 +102,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     ) {
         super(filterChain, locAddr, rmtAddr, accepted);
 
-        assert selectorIdx >= 0;
+        assert worker != null;
         assert sndQueueLimit >= 0;
 
         assert locAddr != null : "GridSelectorNioSessionImpl should have local socket address.";
@@ -101,7 +112,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         this.log = log;
 
-        this.selectorIdx = selectorIdx;
+        this.worker = worker;
 
         sem = sndQueueLimit > 0 ? new Semaphore(sndQueueLimit) : null;
 
@@ -119,12 +130,19 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
+     * @return Worker.
+     */
+    GridNioWorker worker() {
+        return worker;
+    }
+
+    /**
      * Sets selection key for this session.
      *
      * @param key Selection key.
      */
     void key(SelectionKey key) {
-        assert this.key == null;
+        assert key != null;
 
         this.key = key;
     }
@@ -151,10 +169,88 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /**
-     * @return Selector index.
+     * @param from Current session worker.
+     * @param fut Move future.
+     * @return {@code True} if session move was scheduled.
+     */
+    boolean offerMove(GridNioWorker from, GridNioServer.SessionChangeRequest fut) {
+        synchronized (this) {
+            if (log.isDebugEnabled())
+                log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
+
+            GridNioWorker worker0 = worker;
+
+            if (worker0 != from)
+                return false;
+
+            worker.offer(fut);
+        }
+
+        return true;
+    }
+
+    /**
+     * @param fut Future.
+     */
+    void offerStateChange(GridNioServer.SessionChangeRequest fut) {
+        synchronized (this) {
+            if (log.isDebugEnabled())
+                log.debug("Offered move [ses=" + this + ", fut=" + fut + ']');
+
+            GridNioWorker worker0 = worker;
+
+            if (worker0 == null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.add(fut);
+            }
+            else
+                worker0.offer(fut);
+        }
+    }
+
+    /**
+     * @param moveFrom Current session worker.
      */
-    int selectorIndex() {
-        return selectorIdx;
+    void startMoveSession(GridNioWorker moveFrom) {
+        synchronized (this) {
+            assert this.worker == moveFrom;
+
+            if (log.isDebugEnabled())
+                log.debug("Started moving [ses=" + this + ", from=" + moveFrom + ']');
+
+            List<GridNioServer.SessionChangeRequest> sesReqs = moveFrom.clearSessionRequests(this);
+
+            worker = null;
+
+            if (sesReqs != null) {
+                if (pendingStateChanges == null)
+                    pendingStateChanges = new ArrayList<>();
+
+                pendingStateChanges.addAll(sesReqs);
+            }
+        }
+    }
+
+    /**
+     * @param moveTo New session worker.
+     */
+    void finishMoveSession(GridNioWorker moveTo) {
+        synchronized (this) {
+            assert worker == null;
+
+            if (log.isDebugEnabled())
+                log.debug("Finishing moving [ses=" + this + ", to=" + moveTo + ']');
+
+            worker = moveTo;
+
+            if (pendingStateChanges != null) {
+                moveTo.offer(pendingStateChanges);
+
+                pendingStateChanges = null;
+            }
+        }
     }
 
     /**
@@ -163,14 +259,14 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request.
      * @return Updated size of the queue.
      */
-    int offerSystemFuture(GridNioFuture<?> writeFut) {
+    int offerSystemFuture(SessionWriteRequest writeFut) {
         writeFut.messageThread(true);
 
         boolean res = queue.offerFirst(writeFut);
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return queue.sizex();
     }
 
     /**
@@ -183,7 +279,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param writeFut Write request to add.
      * @return Updated size of the queue.
      */
-    int offerFuture(GridNioFuture<?> writeFut) {
+    int offerFuture(SessionWriteRequest writeFut) {
         boolean msgThread = GridNioBackPressureControl.threadProcessingMessage();
 
         if (sem != null && !msgThread)
@@ -195,47 +291,41 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
 
         assert res : "Future was not added to queue";
 
-        return queueSize.incrementAndGet();
+        return queue.sizex();
     }
 
     /**
      * @param futs Futures to resend.
      */
-    void resend(Collection<GridNioFuture<?>> futs) {
+    void resend(Collection<SessionWriteRequest> futs) {
         assert queue.isEmpty() : queue.size();
 
         boolean add = queue.addAll(futs);
 
         assert add;
-
-        boolean set = queueSize.compareAndSet(0, futs.size());
-
-        assert set;
     }
 
     /**
      * @return Message that is in the head of the queue, {@code null} if queue is empty.
      */
-    @Nullable GridNioFuture<?> pollFuture() {
-        GridNioFuture<?> last = queue.poll();
+    @Nullable SessionWriteRequest pollFuture() {
+        SessionWriteRequest last = queue.poll();
 
         if (last != null) {
-            queueSize.decrementAndGet();
-
             if (sem != null && !last.messageThread())
                 sem.release();
 
-            if (recovery != null) {
-                if (!recovery.add(last)) {
+            if (outRecovery != null) {
+                if (!outRecovery.add(last)) {
                     LT.warn(log, "Unacknowledged messages queue size overflow, will attempt to reconnect " +
                         "[remoteAddr=" + remoteAddress() +
-                        ", queueLimit=" + recovery.queueLimit() + ']');
+                        ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     if (log.isDebugEnabled())
                         log.debug("Unacknowledged messages queue size overflow, will attempt to reconnect " +
                             "[remoteAddr=" + remoteAddress() +
-                            ", queueSize=" + recovery.messagesFutures().size() +
-                            ", queueLimit=" + recovery.queueLimit() + ']');
+                            ", queueSize=" + outRecovery.messagesRequests().size() +
+                            ", queueLimit=" + outRecovery.queueLimit() + ']');
 
                     close();
                 }
@@ -249,7 +339,7 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @param fut Future.
      * @return {@code True} if future was removed from queue.
      */
-    boolean removeFuture(GridNioFuture<?> fut) {
+    boolean removeFuture(SessionWriteRequest fut) {
         assert closed();
 
         return queue.removeLastOccurrence(fut);
@@ -261,35 +351,49 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
      * @return Number of write requests.
      */
     int writeQueueSize() {
-        return queueSize.get();
+        return queue.sizex();
     }
 
     /**
      * @return Write requests.
      */
-    Collection<GridNioFuture<?>> writeQueue() {
+    Collection<SessionWriteRequest> writeQueue() {
         return queue;
     }
 
     /** {@inheritDoc} */
-    @Override public void recoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+    @Override public void outRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
         assert recoveryDesc != null;
 
-        recovery = recoveryDesc;
+        outRecovery = recoveryDesc;
     }
 
     /** {@inheritDoc} */
-    @Nullable @Override public GridNioRecoveryDescriptor recoveryDescriptor() {
-        return recovery;
+    @Nullable @Override public GridNioRecoveryDescriptor outRecoveryDescriptor() {
+        return outRecovery;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void inRecoveryDescriptor(GridNioRecoveryDescriptor recoveryDesc) {
+        assert recoveryDesc != null;
+
+        inRecovery = recoveryDesc;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public GridNioRecoveryDescriptor inRecoveryDescriptor() {
+        return inRecovery;
     }
 
     /** {@inheritDoc} */
     @Override public <T> T addMeta(int key, @Nullable T val) {
-        if (val instanceof GridNioRecoveryDescriptor) {
-            recovery = (GridNioRecoveryDescriptor)val;
+        if (!accepted() && val instanceof GridNioRecoveryDescriptor) {
+            outRecovery = (GridNioRecoveryDescriptor)val;
+
+            if (!outRecovery.pairedConnections())
+                inRecovery = outRecovery;
 
-            if (!accepted())
-                recovery.connected();
+            outRecovery.onConnected();
 
             return null;
         }
@@ -313,6 +417,31 @@ class GridSelectorNioSessionImpl extends GridNioSessionImpl {
     }
 
     /** {@inheritDoc} */
+    @Override public void systemMessage(Object sysMsg) {
+        this.sysMsg = sysMsg;
+    }
+
+    /**
+     * @return {@code True} if have pending system message to send.
+     */
+    boolean hasSystemMessage() {
+        return sysMsg != null;
+    }
+
+    /**
+     * Gets and clears pending system message.
+     *
+     * @return Pending system message.
+     */
+    Object systemMessage() {
+        Object ret = sysMsg;
+
+        sysMsg = null;
+
+        return ret;
+    }
+
+    /** {@inheritDoc} */
     @Override public String toString() {
         return S.toString(GridSelectorNioSessionImpl.class, this, super.toString());
     }

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
index ebe86fb..d941bae 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridShmemCommunicationClient.java
@@ -48,6 +48,7 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
     private final MessageFormatter formatter;
 
     /**
+     * @param connIdx Connection index.
      * @param metricsLsnr Metrics listener.
      * @param port Shared memory IPC server port.
      * @param connTimeout Connection timeout.
@@ -55,14 +56,16 @@ public class GridShmemCommunicationClient extends GridAbstractCommunicationClien
      * @param formatter Message formatter.
      * @throws IgniteCheckedException If failed.
      */
-    public GridShmemCommunicationClient(GridNioMetricsListener metricsLsnr,
+    public GridShmemCommunicationClient(
+        int connIdx,
+        GridNioMetricsListener metricsLsnr,
         int port,
         long connTimeout,
         IgniteLogger log,
         MessageFormatter formatter)
         throws IgniteCheckedException
     {
-        super(metricsLsnr);
+        super(connIdx, metricsLsnr);
 
         assert metricsLsnr != null;
         assert port > 0 && port < 0xffff;

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
index 5fe521d..3397772 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/GridTcpNioCommunicationClient.java
@@ -45,11 +45,16 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     private final IgniteLogger log;
 
     /**
+     * @param connIdx Connection index.
      * @param ses Session.
      * @param log Logger.
      */
-    public GridTcpNioCommunicationClient(GridNioSession ses, IgniteLogger log) {
-        super(null);
+    public GridTcpNioCommunicationClient(
+        int connIdx,
+        GridNioSession ses,
+        IgniteLogger log
+    ) {
+        super(connIdx, null);
 
         assert ses != null;
         assert log != null;
@@ -104,40 +109,36 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     }
 
     /** {@inheritDoc} */
-    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> closure)
+    @Override public boolean sendMessage(@Nullable UUID nodeId, Message msg, IgniteInClosure<IgniteException> c)
         throws IgniteCheckedException {
-        // Node ID is never provided in asynchronous send mode.
-        assert nodeId == null;
+        try {
+            // Node ID is never provided in asynchronous send mode.
+            assert nodeId == null;
 
-        if (closure != null)
-            ses.addMeta(ACK_CLOSURE.ordinal(), closure);
+            if (c != null)
+                ses.addMeta(ACK_CLOSURE.ordinal(), c);
 
-        GridNioFuture<?> fut = ses.send(msg);
+            ses.sendNoFuture(msg);
 
-        if (fut.isDone()) {
-            try {
-                fut.get();
-            }
-            catch (IgniteCheckedException e) {
-                if (closure != null)
-                    ses.removeMeta(ACK_CLOSURE.ordinal());
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
+        }
+        catch (IgniteCheckedException e) {
+            if (c != null)
+                ses.removeMeta(ACK_CLOSURE.ordinal());
 
-                if (log.isDebugEnabled())
-                    log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
+            if (log.isDebugEnabled())
+                log.debug("Failed to send message [client=" + this + ", err=" + e + ']');
 
-                if (e.getCause() instanceof IOException) {
-                    ses.close();
+            if (e.getCause() instanceof IOException) {
+                ses.close();
 
-                    return true;
-                }
-                else
-                    throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
+                return true;
             }
+            else
+                throw new IgniteCheckedException("Failed to send message [client=" + this + ']', e);
         }
 
-        if (closure != null)
-            ses.removeMeta(ACK_CLOSURE.ordinal());
-
         return false;
     }
 
@@ -159,4 +160,4 @@ public class GridTcpNioCommunicationClient extends GridAbstractCommunicationClie
     @Override public String toString() {
         return S.toString(GridTcpNioCommunicationClient.class, this, super.toString());
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
new file mode 100644
index 0000000..508c791
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/SessionWriteRequest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util.nio;
+
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.lang.IgniteInClosure;
+
+/**
+ *
+ */
+public interface SessionWriteRequest {
+    /**
+     * Sets flag indicating that message send future was created in thread that was processing a message.
+     *
+     * @param msgThread {@code True} if future was created in thread that is processing message.
+     */
+    public void messageThread(boolean msgThread);
+
+    /**
+     * @return {@code True} if future was created in thread that was processing message.
+     */
+    public boolean messageThread();
+
+    /**
+     * @return {@code True} if skip recovery for this operation.
+     */
+    public boolean skipRecovery();
+
+    /**
+     * Sets ack closure which will be applied when ack received.
+     *
+     * @param c Ack closure.
+     */
+    public void ackClosure(IgniteInClosure<IgniteException> c);
+
+    /**
+     * The method will be called when ack received.
+     */
+    public void onAckReceived();
+
+    /**
+     * @return Ack closure.
+     */
+    public IgniteInClosure<IgniteException> ackClosure();
+
+    /**
+     * @return Session.
+     */
+    public GridNioSession session();
+
+    /**
+     * @param ses Session.
+     */
+    public void resetSession(GridNioSession ses);
+
+    /**
+     *
+     */
+    public void onError(Exception e);
+
+    /**
+     * @return Message.
+     */
+    public Object message();
+
+    /**
+     *
+     */
+    public void onMessageWritten();
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
index d6f9d10..8ed7db0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslFilter.java
@@ -282,9 +282,13 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
     }
 
     /** {@inheritDoc} */
-    @Override public GridNioFuture<?> onSessionWrite(GridNioSession ses, Object msg) throws IgniteCheckedException {
+    @Override public GridNioFuture<?> onSessionWrite(
+        GridNioSession ses,
+        Object msg,
+        boolean fut
+    ) throws IgniteCheckedException {
         if (directMode)
-            return proceedSessionWrite(ses, msg);
+            return proceedSessionWrite(ses, msg, fut);
 
         ByteBuffer input = checkMessage(ses, msg);
 
@@ -441,4 +445,4 @@ public class GridNioSslFilter extends GridNioFilterAdapter {
 
         return (ByteBuffer)msg;
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
index eb8dad4..269e8b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/nio/ssl/GridNioSslHandler.java
@@ -437,7 +437,7 @@ class GridNioSslHandler extends ReentrantLock {
         while (!deferredWriteQueue.isEmpty()) {
             WriteRequest req = deferredWriteQueue.poll();
 
-            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer()));
+            req.future().onDone((GridNioFuture<Object>)parent.proceedSessionWrite(ses, req.buffer(), true));
         }
     }
 
@@ -482,7 +482,7 @@ class GridNioSslHandler extends ReentrantLock {
 
         ByteBuffer cp = copy(outNetBuf);
 
-        return parent.proceedSessionWrite(ses, cp);
+        return parent.proceedSessionWrite(ses, cp, true);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/10ade284/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
index b29d7cd..86aa7a1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/util/tostring/GridToStringBuilder.java
@@ -598,4 +598,4 @@ public class GridToStringBuilder {
 
         return cd;
     }
-}
\ No newline at end of file
+}