You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vk...@apache.org on 2014/12/23 00:39:56 UTC
[02/20] incubator-ignite git commit: IGNITE-61 - Portable format in
direct marshalling
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java
index 77d188b..46975bb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFileAffinityRange.java
@@ -281,7 +281,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -289,31 +289,31 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
switch (commState.idx) {
case 0:
- if (!commState.putGridUuid(affKey))
+ if (!commState.putGridUuid(null, affKey))
return false;
commState.idx++;
case 1:
- if (!commState.putBoolean(done))
+ if (!commState.putBoolean(null, done))
return false;
commState.idx++;
case 2:
- if (!commState.putLong(endOff))
+ if (!commState.putLong(null, endOff))
return false;
commState.idx++;
case 3:
- if (!commState.putLong(startOff))
+ if (!commState.putLong(null, startOff))
return false;
commState.idx++;
case 4:
- if (!commState.putInt(status))
+ if (!commState.putInt(null, status))
return false;
commState.idx++;
@@ -330,7 +330,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
switch (commState.idx) {
case 0:
- IgniteUuid affKey0 = commState.getGridUuid();
+ IgniteUuid affKey0 = commState.getGridUuid(null);
if (affKey0 == GRID_UUID_NOT_READ)
return false;
@@ -343,7 +343,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
if (buf.remaining() < 1)
return false;
- done = commState.getBoolean();
+ done = commState.getBoolean(null);
commState.idx++;
@@ -351,7 +351,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
if (buf.remaining() < 8)
return false;
- endOff = commState.getLong();
+ endOff = commState.getLong(null);
commState.idx++;
@@ -359,7 +359,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
if (buf.remaining() < 8)
return false;
- startOff = commState.getLong();
+ startOff = commState.getLong(null);
commState.idx++;
@@ -367,7 +367,7 @@ public class GridGgfsFileAffinityRange extends GridTcpCommunicationMessageAdapte
if (buf.remaining() < 4)
return false;
- status = commState.getInt();
+ status = commState.getInt(null);
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
index d6329cc..80b4621 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerRequest.java
@@ -99,7 +99,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
return false;
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -107,7 +107,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
switch (commState.idx) {
case 0:
- if (!commState.putGridUuid(fileId))
+ if (!commState.putGridUuid(null, fileId))
return false;
commState.idx++;
@@ -115,7 +115,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
case 1:
if (fragmentRanges != null) {
if (commState.it == null) {
- if (!commState.putInt(fragmentRanges.size()))
+ if (!commState.putInt(null, fragmentRanges.size()))
return false;
commState.it = fragmentRanges.iterator();
@@ -125,7 +125,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
if (commState.cur == NULL)
commState.cur = commState.it.next();
- if (!commState.putMessage((GridGgfsFileAffinityRange)commState.cur))
+ if (!commState.putMessage(null, (GridGgfsFileAffinityRange)commState.cur))
return false;
commState.cur = NULL;
@@ -133,7 +133,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
commState.it = null;
} else {
- if (!commState.putInt(-1))
+ if (!commState.putInt(null, -1))
return false;
}
@@ -154,7 +154,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
switch (commState.idx) {
case 0:
- IgniteUuid fileId0 = commState.getGridUuid();
+ IgniteUuid fileId0 = commState.getGridUuid(null);
if (fileId0 == GRID_UUID_NOT_READ)
return false;
@@ -168,7 +168,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
if (buf.remaining() < 4)
return false;
- commState.readSize = commState.getInt();
+ commState.readSize = commState.getInt(null);
}
if (commState.readSize >= 0) {
@@ -176,7 +176,7 @@ public class GridGgfsFragmentizerRequest extends GridGgfsCommunicationMessage {
fragmentRanges = new ArrayList<>(commState.readSize);
for (int i = commState.readItems; i < commState.readSize; i++) {
- Object _val = commState.getMessage();
+ Object _val = commState.getMessage(null);
if (_val == MSG_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
index 18024b3..9d71c9c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsFragmentizerResponse.java
@@ -74,7 +74,7 @@ public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage {
return false;
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -82,7 +82,7 @@ public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage {
switch (commState.idx) {
case 0:
- if (!commState.putGridUuid(fileId))
+ if (!commState.putGridUuid(null, fileId))
return false;
commState.idx++;
@@ -102,7 +102,7 @@ public class GridGgfsFragmentizerResponse extends GridGgfsCommunicationMessage {
switch (commState.idx) {
case 0:
- IgniteUuid fileId0 = commState.getGridUuid();
+ IgniteUuid fileId0 = commState.getGridUuid(null);
if (fileId0 == GRID_UUID_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java
index f1ab657..06fbcef 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/ggfs/GridGgfsSyncMessage.java
@@ -92,7 +92,7 @@ public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage {
return false;
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -100,13 +100,13 @@ public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage {
switch (commState.idx) {
case 0:
- if (!commState.putLong(order))
+ if (!commState.putLong(null, order))
return false;
commState.idx++;
case 1:
- if (!commState.putBoolean(res))
+ if (!commState.putBoolean(null, res))
return false;
commState.idx++;
@@ -129,7 +129,7 @@ public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage {
if (buf.remaining() < 8)
return false;
- order = commState.getLong();
+ order = commState.getLong(null);
commState.idx++;
@@ -137,7 +137,7 @@ public class GridGgfsSyncMessage extends GridGgfsCommunicationMessage {
if (buf.remaining() < 1)
return false;
- res = commState.getBoolean();
+ res = commState.getBoolean(null);
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/portable/GridPortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/portable/GridPortableOutputStream.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/portable/GridPortableOutputStream.java
index c72ddd7..a71be26 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/portable/GridPortableOutputStream.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/portable/GridPortableOutputStream.java
@@ -151,13 +151,6 @@ public interface GridPortableOutputStream extends GridPortableStream, AutoClosea
public void write(long addr, int cnt);
/**
- * Ensure capacity.
- *
- * @param cnt Required byte count.
- */
- public void ensureCapacity(int cnt);
-
- /**
* Close the stream releasing resources.
*/
@Override public void close();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
index 7adca40..601428c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeRequestWrapper.java
@@ -54,7 +54,7 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -62,7 +62,7 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa
switch (commState.idx) {
case 0:
- if (!commState.putByteArrayClient(bytes))
+ if (!commState.putByteArray(null, bytes))
return false;
commState.idx++;
@@ -78,12 +78,12 @@ public class GridClientHandshakeRequestWrapper extends GridTcpCommunicationMessa
switch (commState.idx) {
case 0:
- byte[] bytes0 = commState.getByteArrayClient(GridClientHandshakeRequest.PACKET_SIZE);
-
- if (bytes0 == BYTE_ARR_NOT_READ)
- return false;
-
- bytes = bytes0;
+// byte[] bytes0 = commState.getByteArray(null, GridClientHandshakeRequest.PACKET_SIZE);
+//
+// if (bytes0 == BYTE_ARR_NOT_READ)
+// return false;
+//
+// bytes = bytes0;
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
index bc5b7af..c77964b 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientHandshakeResponseWrapper.java
@@ -43,7 +43,7 @@ public class GridClientHandshakeResponseWrapper extends GridTcpCommunicationMess
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientMessageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientMessageWrapper.java
index 99e1306..4779afb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientMessageWrapper.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientMessageWrapper.java
@@ -125,7 +125,7 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -133,32 +133,32 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
switch (commState.idx) {
case 0:
- if (!commState.putIntClient(msgSize))
+ if (!commState.putInt(null, msgSize))
return false;
commState.idx++;
case 1:
- if (!commState.putLongClient(reqId))
+ if (!commState.putLong(null, reqId))
return false;
commState.idx++;
case 2:
- if (!commState.putUuidClient(clientId))
+ if (!commState.putUuid(null, clientId))
return false;
commState.idx++;
case 3:
- if (!commState.putUuidClient(destId))
+ if (!commState.putUuid(null, destId))
return false;
commState.idx++;
case 4:
- if (!commState.putByteBufferClient(msg))
- return false;
+// if (!commState.putByteBuffer(null, msg))
+// return false;
commState.idx++;
@@ -176,7 +176,7 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
if (buf.remaining() < 4)
return false;
- msgSize = commState.getIntClient();
+ msgSize = commState.getInt(null);
if (msgSize == 0) // Ping message.
return true;
@@ -187,12 +187,12 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
if (buf.remaining() < 8)
return false;
- reqId = commState.getLongClient();
+ reqId = commState.getLong(null);
commState.idx++;
case 2:
- UUID clientId0 = commState.getUuidClient();
+ UUID clientId0 = commState.getUuid(null);
if (clientId0 == UUID_NOT_READ)
return false;
@@ -202,7 +202,7 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
commState.idx++;
case 3:
- UUID destId0 = commState.getUuidClient();
+ UUID destId0 = commState.getUuid(null);
if (destId0 == UUID_NOT_READ)
return false;
@@ -212,14 +212,14 @@ public class GridClientMessageWrapper extends GridTcpCommunicationMessageAdapter
commState.idx++;
case 4:
- byte[] msg0 = commState.getByteArrayClient(msgSize - 40);
-
- if (msg0 == BYTE_ARR_NOT_READ)
- return false;
-
- msg = ByteBuffer.wrap(msg0);
-
- commState.idx++;
+// byte[] msg0 = commState.getByteArray(null, msgSize - 40);
+//
+// if (msg0 == BYTE_ARR_NOT_READ)
+// return false;
+//
+// msg = ByteBuffer.wrap(msg0);
+//
+// commState.idx++;
}
return true;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientPingPacketWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientPingPacketWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientPingPacketWrapper.java
index 517b804..b719531 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientPingPacketWrapper.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/client/message/GridClientPingPacketWrapper.java
@@ -29,7 +29,7 @@ public class GridClientPingPacketWrapper extends GridTcpCommunicationMessageAdap
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -37,7 +37,7 @@ public class GridClientPingPacketWrapper extends GridTcpCommunicationMessageAdap
switch (commState.idx) {
case 0:
- if (!commState.putIntClient(size))
+ if (!commState.putInt(null, size))
return false;
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultRequest.java
index 0485f3f..2b66ef6 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultRequest.java
@@ -115,7 +115,7 @@ public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter {
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -123,13 +123,13 @@ public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- if (!commState.putGridUuid(taskId))
+ if (!commState.putGridUuid(null, taskId))
return false;
commState.idx++;
case 1:
- if (!commState.putByteArray(topicBytes))
+ if (!commState.putByteArray(null, topicBytes))
return false;
commState.idx++;
@@ -146,7 +146,7 @@ public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- IgniteUuid taskId0 = commState.getGridUuid();
+ IgniteUuid taskId0 = commState.getGridUuid(null);
if (taskId0 == GRID_UUID_NOT_READ)
return false;
@@ -156,7 +156,7 @@ public class GridTaskResultRequest extends GridTcpCommunicationMessageAdapter {
commState.idx++;
case 1:
- byte[] topicBytes0 = commState.getByteArray();
+ byte[] topicBytes0 = commState.getByteArray(null);
if (topicBytes0 == BYTE_ARR_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultResponse.java
index 91cb28f..ad9e42f 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/handlers/task/GridTaskResultResponse.java
@@ -135,7 +135,7 @@ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -143,25 +143,25 @@ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- if (!commState.putString(err))
+ if (!commState.putString(null, err))
return false;
commState.idx++;
case 1:
- if (!commState.putBoolean(finished))
+ if (!commState.putBoolean(null, finished))
return false;
commState.idx++;
case 2:
- if (!commState.putBoolean(found))
+ if (!commState.putBoolean(null, found))
return false;
commState.idx++;
case 3:
- if (!commState.putByteArray(resBytes))
+ if (!commState.putByteArray(null, resBytes))
return false;
commState.idx++;
@@ -178,7 +178,7 @@ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- String err0 = commState.getString();
+ String err0 = commState.getString(null);
if (err0 == STR_NOT_READ)
return false;
@@ -191,7 +191,7 @@ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
if (buf.remaining() < 1)
return false;
- finished = commState.getBoolean();
+ finished = commState.getBoolean(null);
commState.idx++;
@@ -199,12 +199,12 @@ public class GridTaskResultResponse extends GridTcpCommunicationMessageAdapter {
if (buf.remaining() < 1)
return false;
- found = commState.getBoolean();
+ found = commState.getBoolean(null);
commState.idx++;
case 3:
- byte[] resBytes0 = commState.getByteArray();
+ byte[] resBytes0 = commState.getByteArray(null);
if (resBytes0 == BYTE_ARR_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
index 40c0331..077aa93 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridMemcachedMessageWrapper.java
@@ -58,7 +58,7 @@ public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdap
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -66,7 +66,7 @@ public class GridMemcachedMessageWrapper extends GridTcpCommunicationMessageAdap
switch (commState.idx) {
case 0:
- if (!commState.putByteArrayClient(bytes))
+ if (!commState.putByteArray(null, bytes))
return false;
commState.idx++;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
index d42c762..99dcd44 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestDirectParser.java
@@ -38,20 +38,16 @@ public class GridTcpRestDirectParser implements GridNioParser {
/** Protocol handler. */
private final GridTcpRestProtocol proto;
- /** Message reader. */
- private final GridNioMessageReader msgReader;
-
/**
* @param proto Protocol handler.
- * @param msgReader Message reader.
*/
- public GridTcpRestDirectParser(GridTcpRestProtocol proto, GridNioMessageReader msgReader) {
+ public GridTcpRestDirectParser(GridTcpRestProtocol proto) {
this.proto = proto;
- this.msgReader = msgReader;
}
/** {@inheritDoc} */
- @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf) throws IOException, IgniteCheckedException {
+ @Nullable @Override public Object decode(GridNioSession ses, ByteBuffer buf)
+ throws IOException, IgniteCheckedException {
ParserState state = ses.removeMeta(PARSER_STATE.ordinal());
if (state != null) {
@@ -100,7 +96,7 @@ public class GridTcpRestDirectParser implements GridNioParser {
boolean finished = false;
if (buf.hasRemaining())
- finished = msgReader.read(null, msg, buf);
+ finished = msg.readFrom(buf);
if (finished) {
if (msg instanceof GridClientMessageWrapper) {
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
index 24e259f..1974347 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/rest/protocols/tcp/GridTcpRestProtocol.java
@@ -22,7 +22,6 @@ import org.gridgain.grid.kernal.*;
import org.gridgain.grid.kernal.processors.rest.*;
import org.gridgain.grid.kernal.processors.rest.client.message.*;
import org.gridgain.grid.kernal.processors.rest.protocols.*;
-import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.nio.*;
import org.gridgain.grid.util.nio.ssl.*;
import org.gridgain.grid.util.typedef.internal.*;
@@ -49,59 +48,6 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
/** NIO server listener. */
private GridTcpRestNioListener lsnr;
- /** Message reader. */
- private final GridNioMessageReader msgReader = new GridNioMessageReader() {
- @Override public boolean read(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageReader(this, nodeId);
-
- return msg.readFrom(buf);
- }
-
- @Nullable @Override public GridTcpMessageFactory messageFactory() {
- return null;
- }
- };
-
- /** Message writer. */
- private final GridNioMessageWriter msgWriter = new GridNioMessageWriter() {
- @Override public boolean write(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, ByteBuffer buf) {
- assert msg != null;
- assert buf != null;
-
- msg.messageWriter(this, nodeId);
-
- return msg.writeTo(buf);
- }
-
- @Override public int writeFully(@Nullable UUID nodeId, GridTcpCommunicationMessageAdapter msg, OutputStream out,
- ByteBuffer buf) throws IOException {
- assert msg != null;
- assert out != null;
- assert buf != null;
- assert buf.hasArray();
-
- msg.messageWriter(this, nodeId);
-
- boolean finished = false;
- int cnt = 0;
-
- while (!finished) {
- finished = msg.writeTo(buf);
-
- out.write(buf.array(), 0, buf.position());
-
- cnt += buf.position();
-
- buf.clear();
- }
-
- return cnt;
- }
- };
-
/** @param ctx Context. */
public GridTcpRestProtocol(GridKernalContext ctx) {
super(ctx);
@@ -152,7 +98,7 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
lsnr = new GridTcpRestNioListener(log, this, hnd, ctx);
- GridNioParser parser = new GridTcpRestDirectParser(this, msgReader);
+ GridNioParser parser = new GridTcpRestDirectParser(this);
try {
host = resolveRestTcpHost(ctx.config());
@@ -291,7 +237,6 @@ public class GridTcpRestProtocol extends GridRestProtocolAdapter {
.sendQueueLimit(cfg.getRestTcpSendQueueLimit())
.filters(filters)
.directMode(true)
- .messageWriter(msgWriter)
.build();
srv.idleTimeout(cfg.getRestIdleTimeout());
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerCancelRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerCancelRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerCancelRequest.java
index be602db..cc56dc8 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerCancelRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerCancelRequest.java
@@ -69,7 +69,7 @@ public class GridStreamerCancelRequest extends GridTcpCommunicationMessageAdapte
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -77,7 +77,7 @@ public class GridStreamerCancelRequest extends GridTcpCommunicationMessageAdapte
switch (commState.idx) {
case 0:
- if (!commState.putGridUuid(cancelledFutId))
+ if (!commState.putGridUuid(null, cancelledFutId))
return false;
commState.idx++;
@@ -94,7 +94,7 @@ public class GridStreamerCancelRequest extends GridTcpCommunicationMessageAdapte
switch (commState.idx) {
case 0:
- IgniteUuid cancelledFutId0 = commState.getGridUuid();
+ IgniteUuid cancelledFutId0 = commState.getGridUuid(null);
if (cancelledFutId0 == GRID_UUID_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerExecutionRequest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerExecutionRequest.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerExecutionRequest.java
index 40e8d5a..39c8cf2 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerExecutionRequest.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerExecutionRequest.java
@@ -170,7 +170,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -178,25 +178,25 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
switch (commState.idx) {
case 0:
- if (!commState.putByteArray(batchBytes))
+ if (!commState.putByteArray(null, batchBytes))
return false;
commState.idx++;
case 1:
- if (!commState.putGridUuid(clsLdrId))
+ if (!commState.putGridUuid(null, clsLdrId))
return false;
commState.idx++;
case 2:
- if (!commState.putEnum(depMode))
+ if (!commState.putEnum(null, depMode))
return false;
commState.idx++;
case 3:
- if (!commState.putBoolean(forceLocDep))
+ if (!commState.putBoolean(null, forceLocDep))
return false;
commState.idx++;
@@ -204,7 +204,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
case 4:
if (ldrParticipants != null) {
if (commState.it == null) {
- if (!commState.putInt(ldrParticipants.size()))
+ if (!commState.putInt(null, ldrParticipants.size()))
return false;
commState.it = ldrParticipants.entrySet().iterator();
@@ -217,13 +217,13 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
Map.Entry<UUID, IgniteUuid> e = (Map.Entry<UUID, IgniteUuid>)commState.cur;
if (!commState.keyDone) {
- if (!commState.putUuid(e.getKey()))
+ if (!commState.putUuid(null, e.getKey()))
return false;
commState.keyDone = true;
}
- if (!commState.putGridUuid(e.getValue()))
+ if (!commState.putGridUuid(null, e.getValue()))
return false;
commState.keyDone = false;
@@ -233,20 +233,20 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.it = null;
} else {
- if (!commState.putInt(-1))
+ if (!commState.putInt(null, -1))
return false;
}
commState.idx++;
case 5:
- if (!commState.putString(sampleClsName))
+ if (!commState.putString(null, sampleClsName))
return false;
commState.idx++;
case 6:
- if (!commState.putString(userVer))
+ if (!commState.putString(null, userVer))
return false;
commState.idx++;
@@ -263,7 +263,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
switch (commState.idx) {
case 0:
- byte[] batchBytes0 = commState.getByteArray();
+ byte[] batchBytes0 = commState.getByteArray(null);
if (batchBytes0 == BYTE_ARR_NOT_READ)
return false;
@@ -273,7 +273,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.idx++;
case 1:
- IgniteUuid clsLdrId0 = commState.getGridUuid();
+ IgniteUuid clsLdrId0 = commState.getGridUuid(null);
if (clsLdrId0 == GRID_UUID_NOT_READ)
return false;
@@ -286,7 +286,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
if (buf.remaining() < 1)
return false;
- byte depMode0 = commState.getByte();
+ byte depMode0 = commState.getByte(null);
depMode = IgniteDeploymentMode.fromOrdinal(depMode0);
@@ -296,7 +296,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
if (buf.remaining() < 1)
return false;
- forceLocDep = commState.getBoolean();
+ forceLocDep = commState.getBoolean(null);
commState.idx++;
@@ -305,7 +305,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
if (buf.remaining() < 4)
return false;
- commState.readSize = commState.getInt();
+ commState.readSize = commState.getInt(null);
}
if (commState.readSize >= 0) {
@@ -314,7 +314,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
for (int i = commState.readItems; i < commState.readSize; i++) {
if (!commState.keyDone) {
- UUID _val = commState.getUuid();
+ UUID _val = commState.getUuid(null);
if (_val == UUID_NOT_READ)
return false;
@@ -323,7 +323,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.keyDone = true;
}
- IgniteUuid _val = commState.getGridUuid();
+ IgniteUuid _val = commState.getGridUuid(null);
if (_val == GRID_UUID_NOT_READ)
return false;
@@ -343,7 +343,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.idx++;
case 5:
- String sampleClsName0 = commState.getString();
+ String sampleClsName0 = commState.getString(null);
if (sampleClsName0 == STR_NOT_READ)
return false;
@@ -353,7 +353,7 @@ public class GridStreamerExecutionRequest extends GridTcpCommunicationMessageAda
commState.idx++;
case 6:
- String userVer0 = commState.getString();
+ String userVer0 = commState.getString(null);
if (userVer0 == STR_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerResponse.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerResponse.java b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerResponse.java
index 2933e45..fc3cfcb 100644
--- a/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerResponse.java
+++ b/modules/core/src/main/java/org/gridgain/grid/kernal/processors/streamer/GridStreamerResponse.java
@@ -90,7 +90,7 @@ public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter {
commState.setBuffer(buf);
if (!commState.typeWritten) {
- if (!commState.putByte(directType()))
+ if (!commState.putByte(null, directType()))
return false;
commState.typeWritten = true;
@@ -98,13 +98,13 @@ public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- if (!commState.putByteArray(errBytes))
+ if (!commState.putByteArray(null, errBytes))
return false;
commState.idx++;
case 1:
- if (!commState.putGridUuid(futId))
+ if (!commState.putGridUuid(null, futId))
return false;
commState.idx++;
@@ -121,7 +121,7 @@ public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter {
switch (commState.idx) {
case 0:
- byte[] errBytes0 = commState.getByteArray();
+ byte[] errBytes0 = commState.getByteArray(null);
if (errBytes0 == BYTE_ARR_NOT_READ)
return false;
@@ -131,7 +131,7 @@ public class GridStreamerResponse extends GridTcpCommunicationMessageAdapter {
commState.idx++;
case 1:
- IgniteUuid futId0 = commState.getGridUuid();
+ IgniteUuid futId0 = commState.getGridUuid(null);
if (futId0 == GRID_UUID_NOT_READ)
return false;
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/util/GridLongList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridLongList.java b/modules/core/src/main/java/org/gridgain/grid/util/GridLongList.java
index f740ad9..857f80a 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridLongList.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/GridLongList.java
@@ -369,10 +369,14 @@ public class GridLongList implements Externalizable {
}
/**
- * @return Internal array.
+ * @return Array copy.
*/
- public long[] internalArray() {
- return arr;
+ public long[] array() {
+ long[] res = new long[idx];
+
+ System.arraycopy(arr, 0, res, 0, idx);
+
+ return res;
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
index 803badd..3e9f61c 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/GridUtils.java
@@ -27,6 +27,7 @@ import org.gridgain.grid.kernal.managers.deployment.*;
import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.streamer.*;
import org.apache.ignite.spi.discovery.*;
+import org.gridgain.grid.util.direct.*;
import org.gridgain.grid.util.io.*;
import org.gridgain.grid.util.lang.*;
import org.gridgain.grid.util.typedef.*;
@@ -9027,4 +9028,37 @@ public abstract class GridUtils {
return list;
}
+
+ /**
+ * Fully writes communication message to provided stream.
+ *
+ * @param msg Message.
+ * @param out Stream to write to.
+ * @param buf Byte buffer that will be passed to {@link GridTcpCommunicationMessageAdapter#writeTo(ByteBuffer)}
+ * method.
+ * @return Number of written bytes.
+ * @throws IOException In case of error.
+ */
+ public static int writeMessageFully(GridTcpCommunicationMessageAdapter msg, OutputStream out, ByteBuffer buf)
+ throws IOException {
+ assert msg != null;
+ assert out != null;
+ assert buf != null;
+ assert buf.hasArray();
+
+ boolean finished = false;
+ int cnt = 0;
+
+ while (!finished) {
+ finished = msg.writeTo(buf);
+
+ out.write(buf.array(), 0, buf.position());
+
+ cnt += buf.position();
+
+ buf.clear();
+ }
+
+ return cnt;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridPortableByteBufferStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridPortableByteBufferStream.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridPortableByteBufferStream.java
new file mode 100644
index 0000000..4433cbf
--- /dev/null
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridPortableByteBufferStream.java
@@ -0,0 +1,726 @@
+/* @java.file.header */
+
+/* _________ _____ __________________ _____
+ * __ ____/___________(_)______ /__ ____/______ ____(_)_______
+ * _ / __ __ ___/__ / _ __ / _ / __ _ __ `/__ / __ __ \
+ * / /_/ / _ / _ / / /_/ / / /_/ / / /_/ / _ / _ / / /
+ * \____/ /_/ /_/ \_,__/ \____/ \__,_/ /_/ /_/ /_/
+ */
+
+package org.gridgain.grid.util.direct;
+
+import org.gridgain.grid.kernal.processors.portable.*;
+import org.gridgain.grid.util.*;
+import sun.misc.*;
+import sun.nio.ch.*;
+
+import java.nio.*;
+
+import static org.gridgain.grid.util.direct.GridTcpCommunicationMessageAdapter.*;
+
+/**
+ * Portable stream based on {@link ByteBuffer}.
+ */
+public class GridPortableByteBufferStream implements GridPortableOutputStream, GridPortableInputStream {
+ /** */
+ private static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** */
+ private static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** */
+ private static final long SHORT_ARR_OFF = UNSAFE.arrayBaseOffset(short[].class);
+
+ /** */
+ private static final long INT_ARR_OFF = UNSAFE.arrayBaseOffset(int[].class);
+
+ /** */
+ private static final long LONG_ARR_OFF = UNSAFE.arrayBaseOffset(long[].class);
+
+ /** */
+ private static final long FLOAT_ARR_OFF = UNSAFE.arrayBaseOffset(float[].class);
+
+ /** */
+ private static final long DOUBLE_ARR_OFF = UNSAFE.arrayBaseOffset(double[].class);
+
+ /** */
+ private static final long CHAR_ARR_OFF = UNSAFE.arrayBaseOffset(char[].class);
+
+ /** */
+ private static final long BOOLEAN_ARR_OFF = UNSAFE.arrayBaseOffset(boolean[].class);
+
+ /** */
+ private static final byte[] BYTE_ARR_EMPTY = new byte[0];
+
+ /** */
+ private static final short[] SHORT_ARR_EMPTY = new short[0];
+
+ /** */
+ private static final int[] INT_ARR_EMPTY = new int[0];
+
+ /** */
+ private static final long[] LONG_ARR_EMPTY = new long[0];
+
+ /** */
+ private static final float[] FLOAT_ARR_EMPTY = new float[0];
+
+ /** */
+ private static final double[] DOUBLE_ARR_EMPTY = new double[0];
+
+ /** */
+ private static final char[] CHAR_ARR_EMPTY = new char[0];
+
+ /** */
+ private static final boolean[] BOOLEAN_ARR_EMPTY = new boolean[0];
+
+ /** */
+ private static final ArrayCreator<byte[]> BYTE_ARR_CREATOR = new ArrayCreator<byte[]>() {
+ @Override public byte[] create(int len) {
+ switch (len) {
+ case -1:
+ return BYTE_ARR_NOT_READ;
+
+ case 0:
+ return BYTE_ARR_EMPTY;
+
+ default:
+ return new byte[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<short[]> SHORT_ARR_CREATOR = new ArrayCreator<short[]>() {
+ @Override public short[] create(int len) {
+ switch (len) {
+ case -1:
+ return SHORT_ARR_NOT_READ;
+
+ case 0:
+ return SHORT_ARR_EMPTY;
+
+ default:
+ return new short[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<int[]> INT_ARR_CREATOR = new ArrayCreator<int[]>() {
+ @Override public int[] create(int len) {
+ switch (len) {
+ case -1:
+ return INT_ARR_NOT_READ;
+
+ case 0:
+ return INT_ARR_EMPTY;
+
+ default:
+ return new int[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<long[]> LONG_ARR_CREATOR = new ArrayCreator<long[]>() {
+ @Override public long[] create(int len) {
+ switch (len) {
+ case -1:
+ return LONG_ARR_NOT_READ;
+
+ case 0:
+ return LONG_ARR_EMPTY;
+
+ default:
+ return new long[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<float[]> FLOAT_ARR_CREATOR = new ArrayCreator<float[]>() {
+ @Override public float[] create(int len) {
+ switch (len) {
+ case -1:
+ return FLOAT_ARR_NOT_READ;
+
+ case 0:
+ return FLOAT_ARR_EMPTY;
+
+ default:
+ return new float[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<double[]> DOUBLE_ARR_CREATOR = new ArrayCreator<double[]>() {
+ @Override public double[] create(int len) {
+ switch (len) {
+ case -1:
+ return DOUBLE_ARR_NOT_READ;
+
+ case 0:
+ return DOUBLE_ARR_EMPTY;
+
+ default:
+ return new double[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<char[]> CHAR_ARR_CREATOR = new ArrayCreator<char[]>() {
+ @Override public char[] create(int len) {
+ switch (len) {
+ case -1:
+ return CHAR_ARR_NOT_READ;
+
+ case 0:
+ return CHAR_ARR_EMPTY;
+
+ default:
+ return new char[len];
+ }
+ }
+ };
+
+ /** */
+ private static final ArrayCreator<boolean[]> BOOLEAN_ARR_CREATOR = new ArrayCreator<boolean[]>() {
+ @Override public boolean[] create(int len) {
+ switch (len) {
+ case -1:
+ return BOOLEAN_ARR_NOT_READ;
+
+ case 0:
+ return BOOLEAN_ARR_EMPTY;
+
+ default:
+ return new boolean[len];
+ }
+ }
+ };
+
+ /** */
+ private ByteBuffer buf;
+
+ /** */
+ private byte[] heapArr;
+
+ /** */
+ private long baseOff;
+
+ /** */
+ private int arrOff;
+
+ /** */
+ private Object tmpArr;
+
+ /** */
+ private int tmpArrOff;
+
+ /** */
+ private int tmpArrBytes;
+
+ /** */
+ private boolean msgTypeDone;
+
+ /** */
+ private GridTcpCommunicationMessageAdapter msg;
+
+ /** */
+ private boolean lastWritten;
+
+ /**
+ * @param buf Buffer.
+ */
+ public final void setBuffer(ByteBuffer buf) {
+ assert buf != null;
+
+ if (this.buf != buf) {
+ this.buf = buf;
+
+ heapArr = buf.isDirect() ? null : buf.array();
+ baseOff = buf.isDirect() ? ((DirectBuffer)buf).address() : BYTE_ARR_OFF;
+ }
+ }
+
+ /**
+ * @return Whether last object was fully written.
+ */
+ public boolean lastWritten() {
+ boolean lastWritten0 = lastWritten;
+
+ lastWritten = false;
+
+ return lastWritten0;
+ }
+
+ /**
+ * @param msg Message.
+ */
+ public void writeMessage(GridTcpCommunicationMessageAdapter msg) {
+ assert msg != null;
+
+ lastWritten = msg.writeTo(buf);
+ }
+
+ /**
+ * @return Message.
+ */
+ public GridTcpCommunicationMessageAdapter readMessage() {
+ if (!msgTypeDone) {
+ if (!buf.hasRemaining())
+ return MSG_NOT_READ;
+
+ byte type = readByte();
+
+ msg = GridTcpCommunicationMessageFactory.create(type);
+
+ msgTypeDone = true;
+ }
+
+ if (msg.readFrom(buf)) {
+ GridTcpCommunicationMessageAdapter msg0 = msg;
+
+ msgTypeDone = false;
+ msg = null;
+
+ return msg0;
+ }
+ else
+ return MSG_NOT_READ;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByte(byte val) {
+ int pos = buf.position();
+
+ UNSAFE.putByte(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeByteArray(byte[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, BYTE_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBoolean(boolean val) {
+ int pos = buf.position();
+
+ UNSAFE.putBoolean(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeBooleanArray(boolean[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, BOOLEAN_ARR_OFF, val.length);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShort(short val) {
+ int pos = buf.position();
+
+ UNSAFE.putShort(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeShortArray(short[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, SHORT_ARR_OFF, val.length << 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeChar(char val) {
+ int pos = buf.position();
+
+ UNSAFE.putChar(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeCharArray(char[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, CHAR_ARR_OFF, val.length << 1);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int val) {
+ int pos = buf.position();
+
+ UNSAFE.putInt(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeInt(int pos, int val) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeIntArray(int[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, INT_ARR_OFF, val.length << 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloat(float val) {
+ int pos = buf.position();
+
+ UNSAFE.putFloat(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeFloatArray(float[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, FLOAT_ARR_OFF, val.length << 2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLong(long val) {
+ int pos = buf.position();
+
+ UNSAFE.putLong(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeLongArray(long[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, LONG_ARR_OFF, val.length << 3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDouble(double val) {
+ int pos = buf.position();
+
+ UNSAFE.putDouble(heapArr, baseOff + pos, val);
+
+ buf.position(pos + 8);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeDoubleArray(double[] val) {
+ assert val != null;
+
+ lastWritten = writeArray(val, DOUBLE_ARR_OFF, val.length << 3);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(byte[] arr, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void write(long addr, int cnt) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte readByte() {
+ assert buf.hasRemaining();
+
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return UNSAFE.getByte(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] readByteArray(int cnt) {
+ return readArray(BYTE_ARR_CREATOR, cnt, 0, BYTE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readBoolean() {
+ assert buf.hasRemaining();
+
+ int pos = buf.position();
+
+ buf.position(pos + 1);
+
+ return UNSAFE.getBoolean(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean[] readBooleanArray(int cnt) {
+ return readArray(BOOLEAN_ARR_CREATOR, cnt, 0, BOOLEAN_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short readShort() {
+ assert buf.remaining() >= 2;
+
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ return UNSAFE.getShort(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short[] readShortArray(int cnt) {
+ return readArray(SHORT_ARR_CREATOR, cnt, 1, SHORT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public char readChar() {
+ assert buf.remaining() >= 2;
+
+ int pos = buf.position();
+
+ buf.position(pos + 2);
+
+ return UNSAFE.getChar(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public char[] readCharArray(int cnt) {
+ return readArray(CHAR_ARR_CREATOR, cnt, 1, CHAR_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt() {
+ assert buf.remaining() >= 4;
+
+ int pos = buf.position();
+
+ buf.position(pos + 4);
+
+ return UNSAFE.getInt(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readInt(int pos) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int[] readIntArray(int cnt) {
+ return readArray(INT_ARR_CREATOR, cnt, 2, INT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float readFloat() {
+ assert buf.remaining() >= 4;
+
+ int pos = buf.position();
+
+ buf.position(pos + 4);
+
+ return UNSAFE.getFloat(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public float[] readFloatArray(int cnt) {
+ return readArray(FLOAT_ARR_CREATOR, cnt, 2, FLOAT_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long readLong() {
+ assert buf.remaining() >= 8;
+
+ int pos = buf.position();
+
+ buf.position(pos + 8);
+
+ return UNSAFE.getLong(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long[] readLongArray(int cnt) {
+ return readArray(LONG_ARR_CREATOR, cnt, 3, LONG_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double readDouble() {
+ assert buf.remaining() >= 8;
+
+ int pos = buf.position();
+
+ buf.position(pos + 8);
+
+ return UNSAFE.getDouble(heapArr, baseOff + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public double[] readDoubleArray(int cnt) {
+ return readArray(DOUBLE_ARR_CREATOR, cnt, 3, DOUBLE_ARR_OFF);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(byte[] arr, int off, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int read(long addr, int len) {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public int position() {
+ return buf.position();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void position(int pos) {
+ buf.position(pos);
+ }
+
+ /** {@inheritDoc} */
+ public int remaining() {
+ return buf.remaining();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] array() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte[] arrayCopy() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long offheapPointer() {
+ throw new UnsupportedOperationException();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasArray() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * @param arr Array.
+ * @param off Offset.
+ * @param bytes Length in bytes.
+ * @return Whether array was fully written
+ */
+ private boolean writeArray(Object arr, long off, int bytes) {
+ assert arr != null;
+ assert arr.getClass().isArray() && arr.getClass().getComponentType().isPrimitive();
+ assert off > 0;
+ assert bytes >= 0;
+ assert bytes >= arrOff;
+
+ if (!buf.hasRemaining())
+ return false;
+
+ int toWrite = bytes - arrOff;
+ int pos = buf.position();
+ int remaining = buf.remaining();
+
+ if (toWrite <= remaining) {
+ UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, toWrite);
+
+ pos += toWrite;
+
+ buf.position(pos);
+
+ arrOff = 0;
+
+ return true;
+ }
+ else {
+ UNSAFE.copyMemory(arr, off + arrOff, heapArr, baseOff + pos, remaining);
+
+ pos += remaining;
+
+ buf.position(pos);
+
+ arrOff += remaining;
+
+ return false;
+ }
+ }
+
+ /**
+ * @param creator Array creator.
+ * @param len Array Array length.
+ * @param lenShift Array length shift size.
+ * @param off Base offset.
+ * @return Array or special value if it was not fully read.
+ */
+ private <T> T readArray(ArrayCreator<T> creator, int len, int lenShift, long off) {
+ assert creator != null;
+
+ if (tmpArr == null) {
+ assert len >= 0;
+
+ switch (len) {
+ case 0:
+ return creator.create(0);
+
+ default:
+ tmpArr = creator.create(len);
+ tmpArrBytes = len << lenShift;
+ }
+ }
+
+ int toRead = tmpArrBytes - tmpArrOff;
+ int remaining = buf.remaining();
+ int pos = buf.position();
+
+ if (remaining < toRead) {
+ UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, remaining);
+
+ buf.position(pos + remaining);
+
+ tmpArrOff += remaining;
+
+ return creator.create(-1);
+ }
+ else {
+ UNSAFE.copyMemory(heapArr, baseOff + pos, tmpArr, off + tmpArrOff, toRead);
+
+ buf.position(pos + toRead);
+
+ T arr = (T)tmpArr;
+
+ tmpArr = null;
+ tmpArrBytes = 0;
+ tmpArrOff = 0;
+
+ return arr;
+ }
+ }
+
+ /**
+ * Array creator.
+ */
+ private static interface ArrayCreator<T> {
+ /**
+ * @param len Array length or {@code -1} if array was not fully read.
+ * @return New array.
+ */
+ public T create(int len);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/d21e6b4b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
index af63a2f..8961688 100644
--- a/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
+++ b/modules/core/src/main/java/org/gridgain/grid/util/direct/GridTcpCommunicationMessageAdapter.java
@@ -14,8 +14,6 @@ import org.gridgain.grid.kernal.processors.cache.*;
import org.gridgain.grid.kernal.processors.cache.distributed.dht.preloader.*;
import org.gridgain.grid.kernal.processors.clock.*;
import org.gridgain.grid.util.*;
-import org.gridgain.grid.util.nio.*;
-import org.jetbrains.annotations.*;
import java.io.*;
import java.nio.*;
@@ -120,26 +118,6 @@ public abstract class GridTcpCommunicationMessageAdapter implements Serializable
protected final GridTcpCommunicationMessageState commState = new GridTcpCommunicationMessageState();
/**
- * @param msgWriter Message writer.
- * @param nodeId Node ID (provided only if versions are different).
- */
- public void messageWriter(GridNioMessageWriter msgWriter, @Nullable UUID nodeId) {
- assert msgWriter != null;
-
- commState.messageWriter(msgWriter, nodeId);
- }
-
- /**
- * @param msgReader Message reader.
- * @param nodeId Node ID (provided only if versions are different).
- */
- public void messageReader(GridNioMessageReader msgReader, @Nullable UUID nodeId) {
- assert msgReader != null;
-
- commState.messageReader(msgReader, nodeId);
- }
-
- /**
* @param buf Byte buffer.
* @return Whether message was fully written.
*/