You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2017/04/24 15:03:25 UTC
[02/50] [abbrv] ignite git commit: IGNITE-3054 - Add byte buffer
stream
IGNITE-3054 - Add byte buffer stream
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/ef1f288c
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/ef1f288c
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/ef1f288c
Branch: refs/heads/ignite-3054
Commit: ef1f288cc923f5d5edbe5416cbc624f32831911c
Parents: f9d73ec
Author: dkarachentsev <dk...@gridgain.com>
Authored: Tue Oct 25 16:27:05 2016 +0300
Committer: dkarachentsev <dk...@gridgain.com>
Committed: Tue Oct 25 16:27:05 2016 +0300
----------------------------------------------------------------------
.../ignite/spi/discovery/tcp/ServerImpl.java | 91 +++++++++++++++++---
1 file changed, 77 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/ef1f288c/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7499aed..29f613a 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -85,7 +85,6 @@ import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.future.GridFinishedFuture;
import org.apache.ignite.internal.util.future.GridFutureAdapter;
-import org.apache.ignite.internal.util.io.GridByteArrayInputStream;
import org.apache.ignite.internal.util.lang.GridTuple;
import org.apache.ignite.internal.util.nio.GridNioFilter;
import org.apache.ignite.internal.util.nio.GridNioFilterAdapter;
@@ -151,6 +150,7 @@ import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryRedirectToClient
import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryStatusCheckMessage;
import org.apache.ignite.thread.IgniteThread;
import org.apache.ignite.thread.IgniteThreadPoolExecutor;
+import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
@@ -6143,6 +6143,8 @@ class ServerImpl extends TcpDiscoveryImpl {
final ByteBuffer buf = (ByteBuffer)msg;
+ ByteBufferInputStream in = null;
+
if (msgBuf == null || msgBuf.position() == 0) {
// first packet
final int msgLen = getMessageLength(buf, ses);
@@ -6150,35 +6152,41 @@ class ServerImpl extends TcpDiscoveryImpl {
if (msgLen == -1)
return;
- msgBuf = enlargeIfNeed(msgBuf, msgLen);
+ if (msgLen <= buf.remaining())
+ in = new ByteBufferInputStream(buf, msgLen, false);
+ else {
+ msgBuf = enlargeIfNeed(msgBuf, msgLen);
- ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
+ ses.addMeta(INCOMPLETE_MESSAGE_META, msgBuf);
+ }
}
- final int left = buf.remaining();
+ if (in == null) {
+ final int left = buf.remaining();
- buf.get(msgBuf.array(), msgBuf.position(), Math.min(left, msgBuf.remaining()));
+ buf.get(msgBuf.array(), msgBuf.position(), Math.min(left, msgBuf.remaining()));
- final int read = left - buf.remaining();
+ final int read = left - buf.remaining();
- msgBuf.position(msgBuf.position() + read);
+ msgBuf.position(msgBuf.position() + read);
- if (!msgBuf.hasRemaining()) {
- // unmarshal and process
- GridByteArrayInputStream in = null;
+ if (!msgBuf.hasRemaining()) {
+ msgBuf.rewind();
+ in = new ByteBufferInputStream(msgBuf, msgBuf.limit(), true);
+ }
+ }
+
+ if (in != null) {
+ // unmarshal and process
final Object obj;
try {
- in = new GridByteArrayInputStream(msgBuf.array(), 0, msgBuf.position());
-
obj = spi.marshaller().unmarshal(in,
U.resolveClassLoader(spi.ignite().configuration()));
}
finally {
U.closeQuiet(in);
-
- msgBuf.clear();
}
proceedMessageReceived(ses, obj);
@@ -7889,4 +7897,59 @@ class ServerImpl extends TcpDiscoveryImpl {
return buf;
}
+
+ /**
+ *
+ */
+ private static class ByteBufferInputStream extends InputStream {
+ /** */
+ private final ByteBuffer buf;
+
+ /** */
+ private final int oldLimit;
+
+ /** */
+ private final boolean clearOnClose;
+
+ /**
+ * @param buf Byte buffer.
+ * @param len Length that will be set as buf.limit(buf.position() + len).
+ * @param clearOnClose Clear buffer on close.
+ */
+ private ByteBufferInputStream(final ByteBuffer buf, final int len, final boolean clearOnClose) {
+ this.buf = buf;
+ this.oldLimit = buf.limit();
+ this.clearOnClose = clearOnClose;
+
+ buf.limit(buf.position() + len);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read() throws IOException {
+ if (!buf.hasRemaining())
+ return -1;
+
+ return buf.get() & 0xFF;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(@NotNull final byte[] b, final int off, final int len) throws IOException {
+ int toRead = Math.min(len, buf.remaining());
+
+ if (toRead == 0)
+ return -1;
+
+ buf.get(b, off, toRead);
+
+ return toRead;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IOException {
+ if (clearOnClose)
+ buf.clear();
+ else
+ buf.limit(oldLimit);
+ }
+ }
}