You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/12/14 10:02:05 UTC
[38/40] ignite git commit: IGNITE-2105 - Fixed nested collections in
direct marshalling
IGNITE-2105 - Fixed nested collections in direct marshalling
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c61598b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c61598b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c61598b
Branch: refs/heads/master
Commit: 6c61598bc66594e535af4fb10f34abe6797b72c0
Parents: 10b83fb
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Dec 11 17:00:25 2015 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Dec 11 17:00:25 2015 -0800
----------------------------------------------------------------------
.../internal/direct/DirectMessageWriter.java | 108 ++++++++++++++-----
.../ignite/util/GridMessageCollectionTest.java | 34 ++++--
2 files changed, 111 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index ad122ba..085cf68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -38,44 +38,29 @@ import org.jetbrains.annotations.Nullable;
* Message writer implementation.
*/
public class DirectMessageWriter implements MessageWriter {
- /** Stream. */
- private final DirectByteBufferStream stream;
-
/** State. */
- private final DirectMessageState<StateItem> state = new DirectMessageState<>(StateItem.class,
- new IgniteOutClosure<StateItem>() {
- @Override public StateItem apply() {
- return new StateItem();
- }
- });
+ private final DirectMessageState<StateItem> state;
/**
* @param protoVer Protocol version.
*/
- public DirectMessageWriter(byte protoVer) {
- switch (protoVer) {
- case 1:
- stream = new DirectByteBufferStreamImplV1(null);
-
- break;
-
- case 2:
- stream = new DirectByteBufferStreamImplV2(null);
-
- break;
-
- default:
- throw new IllegalStateException("Invalid protocol version: " + protoVer);
- }
+ public DirectMessageWriter(final byte protoVer) {
+ state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
+ @Override public StateItem apply() {
+ return new StateItem(protoVer);
+ }
+ });
}
/** {@inheritDoc} */
@Override public void setBuffer(ByteBuffer buf) {
- stream.setBuffer(buf);
+ state.item().stream.setBuffer(buf);
}
/** {@inheritDoc} */
@Override public boolean writeHeader(byte type, byte fieldCnt) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeByte(type);
return stream.lastFinished();
@@ -83,6 +68,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeByte(String name, byte val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeByte(val);
return stream.lastFinished();
@@ -90,6 +77,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeShort(String name, short val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeShort(val);
return stream.lastFinished();
@@ -97,6 +86,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeInt(String name, int val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeInt(val);
return stream.lastFinished();
@@ -104,6 +95,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeLong(String name, long val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeLong(val);
return stream.lastFinished();
@@ -111,6 +104,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeFloat(String name, float val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeFloat(val);
return stream.lastFinished();
@@ -118,6 +113,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeDouble(String name, double val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeDouble(val);
return stream.lastFinished();
@@ -125,6 +122,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeChar(String name, char val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeChar(val);
return stream.lastFinished();
@@ -132,6 +131,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeBoolean(String name, boolean val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeBoolean(val);
return stream.lastFinished();
@@ -139,6 +140,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeByteArray(String name, @Nullable byte[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeByteArray(val);
return stream.lastFinished();
@@ -146,6 +149,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeByteArray(String name, byte[] val, long off, int len) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeByteArray(val, off, len);
return stream.lastFinished();
@@ -153,6 +158,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeShortArray(String name, @Nullable short[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeShortArray(val);
return stream.lastFinished();
@@ -160,6 +167,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeIntArray(String name, @Nullable int[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeIntArray(val);
return stream.lastFinished();
@@ -167,6 +176,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeLongArray(String name, @Nullable long[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeLongArray(val);
return stream.lastFinished();
@@ -174,6 +185,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeFloatArray(String name, @Nullable float[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeFloatArray(val);
return stream.lastFinished();
@@ -181,6 +194,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeDoubleArray(String name, @Nullable double[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeDoubleArray(val);
return stream.lastFinished();
@@ -188,6 +203,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeCharArray(String name, @Nullable char[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeCharArray(val);
return stream.lastFinished();
@@ -195,6 +212,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeBooleanArray(String name, @Nullable boolean[] val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeBooleanArray(val);
return stream.lastFinished();
@@ -202,6 +221,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeString(String name, String val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeString(val);
return stream.lastFinished();
@@ -209,6 +230,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeBitSet(String name, BitSet val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeBitSet(val);
return stream.lastFinished();
@@ -216,6 +239,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeUuid(String name, UUID val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeUuid(val);
return stream.lastFinished();
@@ -223,6 +248,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeIgniteUuid(String name, IgniteUuid val) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeIgniteUuid(val);
return stream.lastFinished();
@@ -230,6 +257,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public boolean writeMessage(String name, @Nullable Message msg) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeMessage(msg, this);
return stream.lastFinished();
@@ -237,6 +266,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeObjectArray(arr, itemType, this);
return stream.lastFinished();
@@ -244,6 +275,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeCollection(col, itemType, this);
return stream.lastFinished();
@@ -252,6 +285,8 @@ public class DirectMessageWriter implements MessageWriter {
/** {@inheritDoc} */
@Override public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
MessageCollectionItemType valType) {
+ DirectByteBufferStream stream = state.item().stream;
+
stream.writeMap(map, keyType, valType, this);
return stream.lastFinished();
@@ -296,11 +331,34 @@ public class DirectMessageWriter implements MessageWriter {
*/
private static class StateItem implements DirectMessageStateItem {
/** */
+ private final DirectByteBufferStream stream;
+
+ /** */
private int state;
/** */
private boolean hdrWritten;
+ /**
+ * @param protoVer Protocol version.
+ */
+ public StateItem(byte protoVer) {
+ switch (protoVer) {
+ case 1:
+ stream = new DirectByteBufferStreamImplV1(null);
+
+ break;
+
+ case 2:
+ stream = new DirectByteBufferStreamImplV2(null);
+
+ break;
+
+ default:
+ throw new IllegalStateException("Invalid protocol version: " + protoVer);
+ }
+ }
+
/** {@inheritDoc} */
@Override public void reset() {
state = 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index e910a8a..44df767 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -24,6 +24,9 @@ import org.apache.ignite.internal.direct.DirectMessageWriter;
import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
import org.apache.ignite.internal.util.UUIDCollectionMessage;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
import static java.util.UUID.randomUUID;
import static org.apache.ignite.internal.util.GridMessageCollection.of;
@@ -36,6 +39,23 @@ public class GridMessageCollectionTest extends TestCase {
private byte proto;
/**
+ * @param proto Protocol version.
+ * @return Writer.
+ */
+ protected MessageWriter writer(byte proto) {
+ return new DirectMessageWriter(proto);
+ }
+
+ /**
+ * @param msgFactory Message factory.
+ * @param proto Protocol version.
+ * @return Writer.
+ */
+ protected MessageReader reader(MessageFactory msgFactory, byte proto) {
+ return new DirectMessageReader(msgFactory, proto);
+ }
+
+ /**
*
*/
public void testMarshal() {
@@ -88,17 +108,19 @@ public class GridMessageCollectionTest extends TestCase {
private void doTestMarshal(Message m) {
ByteBuffer buf = ByteBuffer.allocate(8 * 1024);
- DirectMessageWriter w = new DirectMessageWriter(proto);
-
- m.writeTo(buf, w);
+ m.writeTo(buf, writer(proto));
buf.flip();
- DirectMessageReader r = new DirectMessageReader(new GridIoMessageFactory(null), proto);
+ byte type = buf.get();
+
+ assertEquals(m.directType(), type);
+
+ GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
- r.setBuffer(buf);
+ Message mx = msgFactory.create(type);
- Message mx = r.readMessage(null);
+ mx.readFrom(buf, reader(msgFactory, proto));
assertEquals(m, mx);
}