You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:25 UTC

[02/50] [abbrv] ignite git commit: IGNITE-3054 - Add byte buffer stream

IGNITE-3054 - Add byte buffer stream


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ef1f288c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ef1f288c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ef1f288c

Branch: refs/heads/ignite-3054
Commit: ef1f288cc923f5d5edbe5416cbc624f32831911c
Parents: f9d73ec
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Oct 25 16:27:05 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Oct 25 16:27:05 2016 +0300

----------------------------------------------------------------------
 .../ignite/spi/discovery/tcp/ServerImpl.java    | 91 +++++++++++++++++---
 1 file changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1f288c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7499aed..29f613a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.future.GridFinishedFuture;
 import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridByteArrayInputStream;
 import org.apache.ignite.internal.util.lang.GridTuple;
 import org.apache.ignite.internal.util.nio.GridNioFilter;
 import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
@@ -151,6 +150,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient
 import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
 import org.apache.ignite.thread.IgniteThread;
 import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.jsr166.ConcurrentHashMap8;
 
@@ -6143,6 +6143,8 @@ class ServerImpl extends TcpDiscoveryImpl {
 
             final ByteBuffer buf = (ByteBuffer)msg;
 
+            ByteBufferInputStream in = null;
+
             if (msgBuf == null || msgBuf.position() == 0) {
                 // first packet
                 final int msgLen = getMessageLength(buf, ses);
@@ -6150,35 +6152,41 @@ class ServerImpl extends TcpDiscoveryImpl {
                 if (msgLen == -1)
                     return;
 
-                msgBuf = enlargeIfNeed(msgBuf, msgLen);
+                if (msgLen <= buf.remaining())
+                    in = new ByteBufferInputStream(buf, msgLen, false);
+                else {
+                    msgBuf = enlargeIfNeed(msgBuf, msgLen);
 
-                ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
+                    ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
+                }
             }
 
-            final int left = buf.remaining();
+            if (in == null) {
+                final int left = buf.remaining();
 
-            buf.get(msgBuf.array(), msgBuf.position(), Math.min(left, msgBuf.remaining()));
+                buf.get(msgBuf.array(), msgBuf.position(), Math.min(left, msgBuf.remaining()));
 
-            final int read = left - buf.remaining();
+                final int read = left - buf.remaining();
 
-            msgBuf.position(msgBuf.position() + read);
+                msgBuf.position(msgBuf.position() + read);
 
-            if (!msgBuf.hasRemaining()) {
-                // unmarshal and process
-                GridByteArrayInputStream in = null;
+                if (!msgBuf.hasRemaining()) {
+                    msgBuf.rewind();
 
+                    in = new ByteBufferInputStream(msgBuf, msgBuf.limit(), true);
+                }
+            }
+
+            if (in != null) {
+                // unmarshal and process
                 final Object obj;
 
                 try {
-                    in = new GridByteArrayInputStream(msgBuf.array(), 0, msgBuf.position());
-
                     obj = spi.marshaller().unmarshal(in,
                         U.resolveClassLoader(spi.ignite().configuration()));
                 }
                 finally {
                     U.closeQuiet(in);
-
-                    msgBuf.clear();
                 }
 
                 proceedMessageReceived(ses, obj);
@@ -7889,4 +7897,59 @@ class ServerImpl extends TcpDiscoveryImpl {
 
         return buf;
     }
+
+    /**
+     *
+     */
+    private static class ByteBufferInputStream extends InputStream {
+        /** */
+        private final ByteBuffer buf;
+
+        /** */
+        private final int oldLimit;
+
+        /** */
+        private final boolean clearOnClose;
+
+        /**
+         * @param buf Byte buffer.
+         * @param len Length that will be set as buf.limit(buf.position() + len).
+         * @param clearOnClose Clear buffer on close.
+         */
+        private ByteBufferInputStream(final ByteBuffer buf, final int len, final boolean clearOnClose) {
+            this.buf = buf;
+            this.oldLimit = buf.limit();
+            this.clearOnClose = clearOnClose;
+
+            buf.limit(buf.position() + len);
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read() throws IOException {
+            if (!buf.hasRemaining())
+                return -1;
+
+            return buf.get() & 0xFF;
+        }
+
+        /** {@inheritDoc} */
+        @Override public int read(@NotNull final byte[] b, final int off, final int len) throws IOException {
+            int toRead = Math.min(len, buf.remaining());
+
+            if (toRead == 0)
+                return -1;
+
+            buf.get(b, off, toRead);
+
+            return toRead;
+        }
+
+        /** {@inheritDoc} */
+        @Override public void close() throws IOException {
+            if (clearOnClose)
+                buf.clear();
+            else
+                buf.limit(oldLimit);
+        }
+    }
 }