You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/23 09:05:35 UTC
ignite git commit: IGNITE-1961: Binary format: optimized sequential
reads.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.5 07f5a62ec -> 5dce6d982
IGNITE-1961: Binary format: optimized sequential reads.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/5dce6d98
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/5dce6d98
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/5dce6d98
Branch: refs/heads/ignite-1.5
Commit: 5dce6d982dd756555bdc5eb868bd8a3fa1359aeb
Parents: 07f5a62
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Mon Nov 23 11:06:23 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Mon Nov 23 11:06:23 2015 +0300
----------------------------------------------------------------------
.../internal/portable/BinaryReaderExImpl.java | 211 +++++++++++--------
1 file changed, 122 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/5dce6d98/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
index 5d31670..2534fd7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryReaderExImpl.java
@@ -129,15 +129,15 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
/** */
private final int start;
+ /** Start of actual data. Positioned right after the header. */
+ private final int dataStart;
+
/** Type ID. */
private final int typeId;
/** Raw offset. */
private final int rawOff;
- /** */
- private final int hdrLen;
-
/** Footer start. */
private final int footerStart;
@@ -241,12 +241,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
int clsNameLen = in.position() - off;
- hdrLen = DFLT_HDR_LEN + clsNameLen;
+ dataStart = start + DFLT_HDR_LEN + clsNameLen;
}
else {
typeId = typeId0;
- hdrLen = DFLT_HDR_LEN;
+ dataStart = start + DFLT_HDR_LEN;
}
idMapper = userType ? ctx.userTypeIdMapper(typeId) : null;
@@ -255,7 +255,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
else {
typeId = 0;
rawOff = 0;
- hdrLen = 0;
+ dataStart = 0;
footerStart = 0;
footerLen = 0;
idMapper = null;
@@ -266,7 +266,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
schema = null;
}
- in.position(start);
+ streamPosition(start);
}
/**
@@ -297,10 +297,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
* @throws BinaryObjectException In case of error.
*/
public Object unmarshal(int offset) throws BinaryObjectException {
- // Random reads prevent any further speculations.
- matching = false;
-
- in.position(offset);
+ streamPosition(offset);
return in.position() >= 0 ? unmarshal() : null;
}
@@ -395,18 +392,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
* @return Field.
*/
private <T> T readHandleField() {
- int handle = (in.position() - 1) - in.readInt();
+ int handlePos = positionForHandle() - in.readInt();
- int retPos = in.position();
-
- Object obj = rCtx.get(handle);
+ Object obj = rCtx.get(handlePos);
if (obj == null) {
- in.position(handle);
+ int retPos = in.position();
+
+ streamPosition(handlePos);
obj = doReadObject();
- in.position(retPos);
+ streamPosition(retPos);
}
return (T)obj;
@@ -1387,7 +1384,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
/** {@inheritDoc} */
@Override public BinaryRawReader rawReader() {
- in.position(rawOff);
+ streamPositionRandom(rawOff);
return this;
}
@@ -1405,43 +1402,51 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
case NULL:
return null;
- case HANDLE:
- int handle = start - in.readInt();
+ case HANDLE: {
+ int handlePos = start - in.readInt();
+
+ Object obj = rCtx.get(handlePos);
- BinaryObject handledPo = rCtx.get(handle);
+ if (obj == null) {
+ int retPos = in.position();
- if (handledPo != null)
- return handledPo;
+ streamPosition(handlePos);
- in.position(handle);
+ obj = unmarshal();
- return unmarshal();
+ streamPosition(retPos);
+ }
- case OBJ:
+ return obj;
+ }
+
+ case OBJ: {
PortableUtils.checkProtocolVersion(in.readByte());
+ int len = PortableUtils.length(in, start);
+
BinaryObjectEx po;
if (detach) {
- in.position(start + GridPortableMarshaller.TOTAL_LEN_POS);
-
- int len = in.readInt();
-
- in.position(start);
+ // In detach mode we simply copy object's content.
+ streamPosition(start);
po = new BinaryObjectImpl(ctx, in.readByteArray(len), 0);
}
- else
- po = in.offheapPointer() > 0
- ? new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start,
- in.remaining() + in.position())
- : new BinaryObjectImpl(ctx, in.array(), start);
+ else {
+ if (in.offheapPointer() == 0)
+ po = new BinaryObjectImpl(ctx, in.array(), start);
+ else
+ po = new BinaryObjectOffheapImpl(ctx, in.offheapPointer(), start,
+ in.remaining() + in.position());
- rCtx.put(start, po);
+ streamPosition(start + po.length());
+ }
- in.position(start + po.length());
+ rCtx.put(start, po);
return po;
+ }
case BYTE:
return in.readByte();
@@ -1546,20 +1551,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
return doReadClass();
case OPTM_MARSH:
- int len = in.readInt();
-
- ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len);
-
- Object obj;
-
- try {
- obj = ctx.optimizedMarsh().unmarshal(input, null);
- }
- catch (IgniteCheckedException e) {
- throw new BinaryObjectException("Failed to unmarshal object with optmMarsh marshaller", e);
- }
-
- return obj;
+ return doReadOptimized();
default:
throw new BinaryObjectException("Invalid flag value: " + flag);
@@ -1592,11 +1584,13 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
return new String(doReadByteArray(), UTF_8);
int strLen = in.readInt();
- int strOff = in.position();
- String res = new String(in.array(), strOff, strLen, UTF_8);
+ int pos = in.position();
- in.position(strOff + strLen);
+ // String will copy necessary array part for us.
+ String res = new String(in.array(), pos, strLen, UTF_8);
+
+ streamPosition(pos + strLen);
return res;
}
@@ -1657,18 +1651,18 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
break;
case HANDLE:
- int handle = start - in.readInt();
+ int handlePos = start - in.readInt();
- obj = rCtx.get(handle);
+ obj = rCtx.get(handlePos);
if (obj == null) {
int retPos = in.position();
- in.position(handle);
+ streamPosition(handlePos);
obj = doReadObject();
- in.position(retPos);
+ streamPosition(retPos);
}
break;
@@ -1676,14 +1670,14 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
case OBJ:
PortableClassDescriptor desc = ctx.descriptorForTypeId(userType, typeId, ldr);
- in.position(start + hdrLen);
+ streamPosition(dataStart);
if (desc == null)
throw new BinaryInvalidTypeException("Unknown type ID: " + typeId);
obj = desc.read(this);
- in.position(footerStart + footerLen);
+ streamPosition(footerStart + footerLen);
break;
@@ -1863,18 +1857,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
break;
case OPTM_MARSH:
- int dataLen = in.readInt();
-
- ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), dataLen);
-
- try {
- obj = ctx.optimizedMarsh().unmarshal(input, null);
- }
- catch (IgniteCheckedException e) {
- throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e);
- }
-
- in.position(in.position() + dataLen);
+ obj = doReadOptimized();
break;
@@ -1886,6 +1869,27 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
}
/**
+ * Read object serialized using optimized marshaller.
+ *
+ * @return Result.
+ */
+ private Object doReadOptimized() {
+ int len = in.readInt();
+
+ ByteArrayInputStream input = new ByteArrayInputStream(in.array(), in.position(), len);
+
+ try {
+ return ctx.optimizedMarsh().unmarshal(input, null);
+ }
+ catch (IgniteCheckedException e) {
+ throw new BinaryObjectException("Failed to unmarshal object with optimized marshaller", e);
+ }
+ finally {
+ streamPosition(in.position() + len);
+ }
+ }
+
+ /**
* @return Value.
*/
private byte[] doReadByteArray() {
@@ -2297,7 +2301,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
int pos = in.position();
- in.position(in.position() + len);
+ streamPosition(in.position() + len);
int start = in.readInt();
@@ -2479,7 +2483,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
* @return Offset.
*/
public boolean findFieldByName(String name) {
- assert hdrLen != 0;
+ assert dataStart != start;
if (footerLen == 0)
return false;
@@ -2495,9 +2499,11 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
switch (confirm) {
case CONFIRMED:
// The best case: got order without ID calculation and (ID -> order) lookup.
- order = expOrder;
+ if (expOrder == 0)
+ // When we read the very first field, position is set to start, hence this re-positioning.
+ streamPosition(dataStart);
- break;
+ return true;
case REJECTED:
// Rejected, no more speculations are possible. Fallback to the slowest scenario.
@@ -2518,7 +2524,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
// IDs matched, cache field name inside schema.
schema.clarifyFieldName(expOrder, name);
- order = expOrder;
+ if (expOrder == 0)
+ streamPosition(dataStart);
+
+ return true;
}
else {
// No match, stop further speculations.
@@ -2544,10 +2553,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
* (string -> ID) calculations.
*
* @param id Field ID.
- * @return Offset.
+ * @return {@code True} if field was found and stream was positioned accordingly.
*/
private boolean findFieldById(int id) {
- assert hdrLen != 0;
+ assert dataStart != start;
if (footerLen == 0)
return false;
@@ -2561,8 +2570,12 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
int realId = schema.fieldId(expOrder);
- if (realId == id)
- order = expOrder;
+ if (realId == id) {
+ if (expOrder == 0)
+ streamPosition(dataStart);
+
+ return true;
+ }
else {
// Mismatch detected, no need for further speculations.
matching = false;
@@ -2580,10 +2593,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
}
/**
- * Set position for the given user field order and return it.
+ * Set position for the given user field order.
*
* @param order Order.
- * @return Position.
+ * @return {@code True} if field was found and stream was positioned accordingly.
*/
private boolean trySetUserFieldPosition(int order) {
if (order != PortableSchema.ORDER_NOT_FOUND) {
@@ -2591,7 +2604,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
int pos = start + PortableUtils.fieldOffsetRelative(in, offsetPos, fieldOffsetLen);
- in.position(pos);
+ streamPosition(pos);
return true;
}
@@ -2600,10 +2613,10 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
}
/**
- * Set position for the given system field ID and return it.
+ * Set position for the given system field ID.
*
* @param id Field ID.
- * @return Position.
+ * @return {@code True} if field was found and stream was positioned accordingly.
*/
private boolean trySetSystemFieldPosition(int id) {
// System types are never written with compact footers because they do not have metadata.
@@ -2622,7 +2635,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
int pos = start + PortableUtils.fieldOffsetRelative(in, searchPos + PortableUtils.FIELD_ID_LEN,
fieldOffsetLen);
- in.position(pos);
+ streamPosition(pos);
return true;
}
@@ -2631,6 +2644,26 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
}
}
+ /**
+ * Set stream position.
+ *
+ * @param pos Position.
+ */
+ private void streamPosition(int pos) {
+ in.position(pos);
+ }
+
+ /**
+ * Set stream position as a part of some random read. Further speculations will be disabled after this call.
+ *
+ * @param pos Position.
+ */
+ private void streamPositionRandom(int pos) {
+ streamPosition(pos);
+
+ matching = false;
+ }
+
/** {@inheritDoc} */
@Override public int readUnsignedByte() throws IOException {
return readByte() & 0xff;
@@ -2697,7 +2730,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Obje
@Override public int skipBytes(int n) throws IOException {
int toSkip = Math.min(in.remaining(), n);
- in.position(in.position() + toSkip);
+ streamPositionRandom(in.position() + toSkip);
return toSkip;
}