You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/23 09:05:35 UTC

ignite git commit: IGNITE-1961: Binary format: optimized sequential reads.

Repository: ignite
Updated Branches:
  refs/heads/ignite-1.5 07f5a62ec -> 5dce6d982


IGNITE-1961: Binary format: optimized sequential reads.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5dce6d98
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5dce6d98
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5dce6d98

Branch: refs/heads/ignite-1.5
Commit: 5dce6d982dd756555bdc5eb868bd8a3fa1359aeb
Parents: 07f5a62
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Nov 23 11:06:23 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Nov 23 11:06:23 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/BinaryReaderExImpl.java   | 211 +++++++++++--------
 1 file changed, 122 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/5dce6d98/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
index 5d31670..2534fd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
@@ -129,15 +129,15 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
     /** */
     private final int start;
 
+    /** Start of actual data. Positioned right after the header. */
+    private final int dataStart;
+
     /** Type ID. */
     private final int typeId;
 
     /** Raw offset. */
     private final int rawOff;
 
-    /** */
-    private final int hdrLen;
-
     /** Footer start. */
     private final int footerStart;
 
@@ -241,12 +241,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
 
                 int clsNameLen = in.position() - off;
 
-                hdrLen = DFLT_HDR_LEN + clsNameLen;
+                dataStart = start + DFLT_HDR_LEN + clsNameLen;
             }
             else {
                 typeId = typeId0;
 
-                hdrLen = DFLT_HDR_LEN;
+                dataStart = start + DFLT_HDR_LEN;
             }
 
             idMapper = userType ? ctx.userTypeIdMapper(typeId) : null;
@@ -255,7 +255,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
         else {
             typeId = 0;
             rawOff = 0;
-            hdrLen = 0;
+            dataStart = 0;
             footerStart = 0;
             footerLen = 0;
             idMapper = null;
@@ -266,7 +266,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
             schema = null;
         }
 
-        in.position(start);
+        streamPosition(start);
     }
 
     /**
@@ -297,10 +297,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
      * @throws BinaryObjectException In case of error.
      */
     public Object unmarshal(int offset) throws BinaryObjectException {
-        // Random reads prevent any further speculations.
-        matching = false;
-
-        in.position(offset);
+        streamPosition(offset);
 
         return in.position() >= 0 ? unmarshal() : null;
     }
@@ -395,18 +392,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
      * @return Field.
      */
     private <T> T readHandleField() {
-        int handle = (in.position() - 1) - in.readInt();
+        int handlePos = positionForHandle() - in.readInt();
 
-        int retPos = in.position();
-
-        Object obj = rCtx.get(handle);
+        Object obj = rCtx.get(handlePos);
 
         if (obj == null) {
-            in.position(handle);
+            int retPos = in.position();
+
+            streamPosition(handlePos);
 
             obj = doReadObject();
 
-            in.position(retPos);
+            streamPosition(retPos);
         }
 
         return (T)obj;
@@ -1387,7 +1384,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
 
     /** {@inheritDoc} */
     @Override public BinaryRawReader rawReader() {
-        in.position(rawOff);
+        streamPositionRandom(rawOff);
 
         return this;
     }
@@ -1405,43 +1402,51 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
             case NULL:
                 return null;
 
-            case HANDLE:
-                int handle = start - in.readInt();
+            case HANDLE: {
+                int handlePos = start - in.readInt();
+
+                Object obj = rCtx.get(handlePos);
 
-                BinaryObject handledPo = rCtx.get(handle);
+                if (obj == null) {
+                    int retPos = in.position();
 
-                if (handledPo != null)
-                    return handledPo;
+                    streamPosition(handlePos);
 
-                in.position(handle);
+                    obj = unmarshal();
 
-                return unmarshal();
+                    streamPosition(retPos);
+                }
 
-            case OBJ:
+                return obj;
+            }
+
+            case OBJ: {
                 PortableUtils.checkProtocolVersion(in.readByte());
 
+                int len = PortableUtils.length(in, start);
+
                 BinaryObjectEx po;
 
                 if (detach) {
-                    in.position(start + GridPortableMarshaller.TOTAL_LEN_POS);
-
-                    int len = in.readInt();
-
-                    in.position(start);
+                    // In detach mode we simply copy object's content.
+                    streamPosition(start);
 
                     po = new BinaryObjectImpl(ctx, in.readByteArray(len), 0);
                 }
-                else
-                    po = in.offheapPointer() > 0
-                        ? new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start,
-                        in.remaining() + in.position())
-                        : new BinaryObjectImpl(ctx, in.array(), start);
+                else {
+                    if (in.offheapPointer() == 0)
+                        po = new BinaryObjectImpl(ctx, in.array(), start);
+                    else
+                        po = new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start,
+                            in.remaining() + in.position());
 
-                rCtx.put(start, po);
+                    streamPosition(start + po.length());
+                }
 
-                in.position(start + po.length());
+                rCtx.put(start, po);
 
                 return po;
+            }
 
             case BYTE:
                 return in.readByte();
@@ -1546,20 +1551,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                 return doReadClass();
 
             case OPTM_MARSH:
-                int len = in.readInt();
-
-                ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len);
-
-                Object obj;
-
-                try {
-                    obj = ctx.optimizedMarsh().unmarshal(input, null);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new BinaryObjectException("Failed to unmarshal object with optmMarsh marshaller", e);
-                }
-
-                return obj;
+                return doReadOptimized();
 
             default:
                 throw new BinaryObjectException("Invalid flag value: " + flag);
@@ -1592,11 +1584,13 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
             return new String(doReadByteArray(), UTF_8);
 
         int strLen = in.readInt();
-        int strOff = in.position();
 
-        String res = new String(in.array(), strOff, strLen, UTF_8);
+        int pos = in.position();
 
-        in.position(strOff + strLen);
+        // String will copy necessary array part for us.
+        String res = new String(in.array(), pos, strLen, UTF_8);
+
+        streamPosition(pos + strLen);
 
         return res;
     }
@@ -1657,18 +1651,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                 break;
 
             case HANDLE:
-                int handle = start - in.readInt();
+                int handlePos = start - in.readInt();
 
-                obj = rCtx.get(handle);
+                obj = rCtx.get(handlePos);
 
                 if (obj == null) {
                     int retPos = in.position();
 
-                    in.position(handle);
+                    streamPosition(handlePos);
 
                     obj = doReadObject();
 
-                    in.position(retPos);
+                    streamPosition(retPos);
                 }
 
                 break;
@@ -1676,14 +1670,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
             case OBJ:
                 PortableClassDescriptor desc = ctx.descriptorForTypeId(userType, typeId, ldr);
 
-                in.position(start + hdrLen);
+                streamPosition(dataStart);
 
                 if (desc == null)
                     throw new BinaryInvalidTypeException("Unknown type ID: " + typeId);
 
                 obj = desc.read(this);
 
-                in.position(footerStart + footerLen);
+                streamPosition(footerStart + footerLen);
 
                 break;
 
@@ -1863,18 +1857,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                 break;
 
             case OPTM_MARSH:
-                int dataLen = in.readInt();
-
-                ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), dataLen);
-
-                try {
-                    obj = ctx.optimizedMarsh().unmarshal(input, null);
-                }
-                catch (IgniteCheckedException e) {
-                    throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e);
-                }
-
-                in.position(in.position() + dataLen);
+                obj = doReadOptimized();
 
                 break;
 
@@ -1886,6 +1869,27 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
     }
 
     /**
+     * Read object serialized using optimized marshaller.
+     *
+     * @return Result.
+     */
+    private Object doReadOptimized() {
+        int len = in.readInt();
+
+        ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len);
+
+        try {
+            return ctx.optimizedMarsh().unmarshal(input, null);
+        }
+        catch (IgniteCheckedException e) {
+            throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e);
+        }
+        finally {
+            streamPosition(in.position() + len);
+        }
+    }
+
+    /**
      * @return Value.
      */
     private byte[] doReadByteArray() {
@@ -2297,7 +2301,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
 
             int pos = in.position();
 
-            in.position(in.position() + len);
+            streamPosition(in.position() + len);
 
             int start = in.readInt();
 
@@ -2479,7 +2483,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
      * @return Offset.
      */
     public boolean findFieldByName(String name) {
-        assert hdrLen != 0;
+        assert dataStart != start;
 
         if (footerLen == 0)
             return false;
@@ -2495,9 +2499,11 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                 switch (confirm) {
                     case CONFIRMED:
                         // The best case: got order without ID calculation and (ID -> order) lookup.
-                        order = expOrder;
+                        if (expOrder == 0)
+                            // When we read the very first field, position is set to start, hence this re-positioning.
+                            streamPosition(dataStart);
 
-                        break;
+                        return true;
 
                     case REJECTED:
                         // Rejected, no more speculations are possible. Fallback to the slowest scenario.
@@ -2518,7 +2524,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                             // IDs matched, cache field name inside schema.
                             schema.clarifyFieldName(expOrder, name);
 
-                            order = expOrder;
+                            if (expOrder == 0)
+                                streamPosition(dataStart);
+
+                            return true;
                         }
                         else {
                             // No match, stop further speculations.
@@ -2544,10 +2553,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
      * (string -> ID) calculations.
      *
      * @param id Field ID.
-     * @return Offset.
+     * @return {@code True} if field was found and stream was positioned accordingly.
      */
     private boolean findFieldById(int id) {
-        assert hdrLen != 0;
+        assert dataStart != start;
 
         if (footerLen == 0)
             return false;
@@ -2561,8 +2570,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
 
                 int realId = schema.fieldId(expOrder);
 
-                if (realId == id)
-                    order = expOrder;
+                if (realId == id) {
+                    if (expOrder == 0)
+                        streamPosition(dataStart);
+
+                    return true;
+                }
                 else {
                     // Mismatch detected, no need for further speculations.
                     matching = false;
@@ -2580,10 +2593,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
     }
 
     /**
-     * Set position for the given user field order and return it.
+     * Set position for the given user field order.
      *
      * @param order Order.
-     * @return Position.
+     * @return {@code True} if field was found and stream was positioned accordingly.
      */
     private boolean trySetUserFieldPosition(int order) {
         if (order != PortableSchema.ORDER_NOT_FOUND) {
@@ -2591,7 +2604,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
 
             int pos = start + PortableUtils.fieldOffsetRelative(in, offsetPos, fieldOffsetLen);
 
-            in.position(pos);
+            streamPosition(pos);
 
             return true;
         }
@@ -2600,10 +2613,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
     }
 
     /**
-     * Set position for the given system field ID and return it.
+     * Set position for the given system field ID.
      *
      * @param id Field ID.
-     * @return Position.
+     * @return {@code True} if field was found and stream was positioned accordingly.
      */
     private boolean trySetSystemFieldPosition(int id) {
         // System types are never written with compact footers because they do not have metadata.
@@ -2622,7 +2635,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
                 int pos = start + PortableUtils.fieldOffsetRelative(in, searchPos + PortableUtils.FIELD_ID_LEN,
                     fieldOffsetLen);
 
-                in.position(pos);
+                streamPosition(pos);
 
                 return true;
             }
@@ -2631,6 +2644,26 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
         }
     }
 
+    /**
+     * Set stream position.
+     *
+     * @param pos Position.
+     */
+    private void streamPosition(int pos) {
+        in.position(pos);
+    }
+
+    /**
+     * Set stream position as a part of some random read. Further speculations will be disabled after this call.
+     *
+     * @param pos Position.
+     */
+    private void streamPositionRandom(int pos) {
+        streamPosition(pos);
+
+        matching = false;
+    }
+
     /** {@inheritDoc} */
     @Override public int readUnsignedByte() throws IOException {
         return readByte() & 0xff;
@@ -2697,7 +2730,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
     @Override public int skipBytes(int n) throws IOException {
         int toSkip = Math.min(in.remaining(), n);
 
-        in.position(in.position() + toSkip);
+        streamPositionRandom(in.position() + toSkip);
 
         return toSkip;
     }