You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by dm...@apache.org on 2015/12/14 10:02:05 UTC

[38/40] ignite git commit: IGNITE-2105 - Fixed nested collections in direct marshalling

IGNITE-2105 - Fixed nested collections in direct marshalling


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/6c61598b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/6c61598b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/6c61598b

Branch: refs/heads/master
Commit: 6c61598bc66594e535af4fb10f34abe6797b72c0
Parents: 10b83fb
Author: Valentin Kulichenko <va...@gmail.com>
Authored: Fri Dec 11 17:00:25 2015 -0800
Committer: Valentin Kulichenko <va...@gmail.com>
Committed: Fri Dec 11 17:00:25 2015 -0800

----------------------------------------------------------------------
 .../internal/direct/DirectMessageWriter.java    | 108 ++++++++++++++-----
 .../ignite/util/GridMessageCollectionTest.java  |  34 ++++--
 2 files changed, 111 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
index ad122ba..085cf68 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/direct/DirectMessageWriter.java
@@ -38,44 +38,29 @@ import org.jetbrains.annotations.Nullable;
  * Message writer implementation.
  */
 public class DirectMessageWriter implements MessageWriter {
-    /** Stream. */
-    private final DirectByteBufferStream stream;
-
     /** State. */
-    private final DirectMessageState<StateItem> state = new DirectMessageState<>(StateItem.class,
-        new IgniteOutClosure<StateItem>() {
-            @Override public StateItem apply() {
-                return new StateItem();
-            }
-        });
+    private final DirectMessageState<StateItem> state;
 
     /**
      * @param protoVer Protocol version.
      */
-    public DirectMessageWriter(byte protoVer) {
-        switch (protoVer) {
-            case 1:
-                stream = new DirectByteBufferStreamImplV1(null);
-
-                break;
-
-            case 2:
-                stream = new DirectByteBufferStreamImplV2(null);
-
-                break;
-
-            default:
-                throw new IllegalStateException("Invalid protocol version: " + protoVer);
-        }
+    public DirectMessageWriter(final byte protoVer) {
+        state = new DirectMessageState<>(StateItem.class, new IgniteOutClosure<StateItem>() {
+            @Override public StateItem apply() {
+                return new StateItem(protoVer);
+            }
+        });
     }
 
     /** {@inheritDoc} */
     @Override public void setBuffer(ByteBuffer buf) {
-        stream.setBuffer(buf);
+        state.item().stream.setBuffer(buf);
     }
 
     /** {@inheritDoc} */
     @Override public boolean writeHeader(byte type, byte fieldCnt) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeByte(type);
 
         return stream.lastFinished();
@@ -83,6 +68,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeByte(String name, byte val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeByte(val);
 
         return stream.lastFinished();
@@ -90,6 +77,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeShort(String name, short val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeShort(val);
 
         return stream.lastFinished();
@@ -97,6 +86,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeInt(String name, int val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeInt(val);
 
         return stream.lastFinished();
@@ -104,6 +95,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeLong(String name, long val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeLong(val);
 
         return stream.lastFinished();
@@ -111,6 +104,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeFloat(String name, float val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeFloat(val);
 
         return stream.lastFinished();
@@ -118,6 +113,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeDouble(String name, double val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeDouble(val);
 
         return stream.lastFinished();
@@ -125,6 +122,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeChar(String name, char val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeChar(val);
 
         return stream.lastFinished();
@@ -132,6 +131,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeBoolean(String name, boolean val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeBoolean(val);
 
         return stream.lastFinished();
@@ -139,6 +140,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeByteArray(String name, @Nullable byte[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeByteArray(val);
 
         return stream.lastFinished();
@@ -146,6 +149,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeByteArray(String name, byte[] val, long off, int len) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeByteArray(val, off, len);
 
         return stream.lastFinished();
@@ -153,6 +158,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeShortArray(String name, @Nullable short[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeShortArray(val);
 
         return stream.lastFinished();
@@ -160,6 +167,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeIntArray(String name, @Nullable int[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeIntArray(val);
 
         return stream.lastFinished();
@@ -167,6 +176,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeLongArray(String name, @Nullable long[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeLongArray(val);
 
         return stream.lastFinished();
@@ -174,6 +185,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeFloatArray(String name, @Nullable float[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeFloatArray(val);
 
         return stream.lastFinished();
@@ -181,6 +194,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeDoubleArray(String name, @Nullable double[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeDoubleArray(val);
 
         return stream.lastFinished();
@@ -188,6 +203,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeCharArray(String name, @Nullable char[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeCharArray(val);
 
         return stream.lastFinished();
@@ -195,6 +212,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeBooleanArray(String name, @Nullable boolean[] val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeBooleanArray(val);
 
         return stream.lastFinished();
@@ -202,6 +221,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeString(String name, String val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeString(val);
 
         return stream.lastFinished();
@@ -209,6 +230,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeBitSet(String name, BitSet val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeBitSet(val);
 
         return stream.lastFinished();
@@ -216,6 +239,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeUuid(String name, UUID val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeUuid(val);
 
         return stream.lastFinished();
@@ -223,6 +248,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeIgniteUuid(String name, IgniteUuid val) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeIgniteUuid(val);
 
         return stream.lastFinished();
@@ -230,6 +257,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public boolean writeMessage(String name, @Nullable Message msg) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeMessage(msg, this);
 
         return stream.lastFinished();
@@ -237,6 +266,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeObjectArray(String name, T[] arr, MessageCollectionItemType itemType) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeObjectArray(arr, itemType, this);
 
         return stream.lastFinished();
@@ -244,6 +275,8 @@ public class DirectMessageWriter implements MessageWriter {
 
     /** {@inheritDoc} */
     @Override public <T> boolean writeCollection(String name, Collection<T> col, MessageCollectionItemType itemType) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeCollection(col, itemType, this);
 
         return stream.lastFinished();
@@ -252,6 +285,8 @@ public class DirectMessageWriter implements MessageWriter {
     /** {@inheritDoc} */
     @Override public <K, V> boolean writeMap(String name, Map<K, V> map, MessageCollectionItemType keyType,
         MessageCollectionItemType valType) {
+        DirectByteBufferStream stream = state.item().stream;
+
         stream.writeMap(map, keyType, valType, this);
 
         return stream.lastFinished();
@@ -296,11 +331,34 @@ public class DirectMessageWriter implements MessageWriter {
      */
     private static class StateItem implements DirectMessageStateItem {
         /** */
+        private final DirectByteBufferStream stream;
+
+        /** */
         private int state;
 
         /** */
         private boolean hdrWritten;
 
+        /**
+         * @param protoVer Protocol version.
+         */
+        public StateItem(byte protoVer) {
+            switch (protoVer) {
+                case 1:
+                    stream = new DirectByteBufferStreamImplV1(null);
+
+                    break;
+
+                case 2:
+                    stream = new DirectByteBufferStreamImplV2(null);
+
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid protocol version: " + protoVer);
+            }
+        }
+
         /** {@inheritDoc} */
         @Override public void reset() {
             state = 0;

http://git-wip-us.apache.org/repos/asf/ignite/blob/6c61598b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
index e910a8a..44df767 100644
--- a/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/util/GridMessageCollectionTest.java
@@ -24,6 +24,9 @@ import org.apache.ignite.internal.direct.DirectMessageWriter;
 import org.apache.ignite.internal.managers.communication.GridIoMessageFactory;
 import org.apache.ignite.internal.util.UUIDCollectionMessage;
 import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageFactory;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
 
 import static java.util.UUID.randomUUID;
 import static org.apache.ignite.internal.util.GridMessageCollection.of;
@@ -36,6 +39,23 @@ public class GridMessageCollectionTest extends TestCase {
     private byte proto;
 
     /**
+     * @param proto Protocol version.
+     * @return Writer.
+     */
+    protected MessageWriter writer(byte proto) {
+        return new DirectMessageWriter(proto);
+    }
+
+    /**
+     * @param msgFactory Message factory.
+     * @param proto Protocol version.
+     * @return Writer.
+     */
+    protected MessageReader reader(MessageFactory msgFactory, byte proto) {
+        return new DirectMessageReader(msgFactory, proto);
+    }
+
+    /**
      *
      */
     public void testMarshal() {
@@ -88,17 +108,19 @@ public class GridMessageCollectionTest extends TestCase {
     private void doTestMarshal(Message m) {
         ByteBuffer buf = ByteBuffer.allocate(8 * 1024);
 
-        DirectMessageWriter w = new DirectMessageWriter(proto);
-
-        m.writeTo(buf, w);
+        m.writeTo(buf, writer(proto));
 
         buf.flip();
 
-        DirectMessageReader r = new DirectMessageReader(new GridIoMessageFactory(null), proto);
+        byte type = buf.get();
+
+        assertEquals(m.directType(), type);
+
+        GridIoMessageFactory msgFactory = new GridIoMessageFactory(null);
 
-        r.setBuffer(buf);
+        Message mx = msgFactory.create(type);
 
-        Message mx = r.readMessage(null);
+        mx.readFrom(buf, reader(msgFactory, proto));
 
         assertEquals(m, mx);
     }