You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2015/10/30 16:52:39 UTC
[43/52] [abbrv] ignite git commit: IGNITE-1770: Implemented
constant-time field lookup on protocol level.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
index 2140bee..eafcbd1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
@@ -18,7 +18,9 @@
package org.apache.ignite.internal.portable;
import org.apache.ignite.internal.portable.builder.PortableLazyValue;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.portable.PortableException;
import org.apache.ignite.portable.PortableObject;
import org.jetbrains.annotations.Nullable;
@@ -77,7 +79,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.UUID_ARR;
/**
- *
+ * Portable utils.
*/
public class PortableUtils {
/** */
@@ -92,6 +94,47 @@ public class PortableUtils {
/** Portable classes. */
private static final Collection<Class<?>> PORTABLE_CLS = new HashSet<>();
+ /** Flag: user type. */
+ public static final short FLAG_USR_TYP = 0x1;
+
+ /** Flag: only raw data exists. */
+ public static final short FLAG_RAW_ONLY = 0x2;
+
+ /**
+ * Write flags.
+ *
+ * @param writer Writer.
+ * @param userType User type flag.
+ */
+ public static void writeFlags(PortableWriterExImpl writer, boolean userType) {
+ short val = 0;
+
+ if (userType)
+ val |= FLAG_USR_TYP;
+
+ writer.doWriteShort(val);
+ }
+
+ /**
+ * Check if user type flag is set.
+ *
+ * @param flags Flags.
+ * @return {@code True} if set.
+ */
+ public static boolean isUserType(short flags) {
+ return (flags & FLAG_USR_TYP) == FLAG_USR_TYP;
+ }
+
+ /**
+ * Check if raw-only flag is set.
+ *
+ * @param flags Flags.
+ * @return {@code True} if set.
+ */
+ public static boolean isRawOnly(short flags) {
+ return (flags & FLAG_RAW_ONLY) == FLAG_RAW_ONLY;
+ }
+
/**
*
*/
@@ -487,4 +530,118 @@ public class PortableUtils {
if (PROTO_VER != protoVer)
throw new PortableException("Unsupported protocol version: " + protoVer);
}
+
+ /**
+ * Write portable header.
+ *
+ * @param writer Writer.
+ * @param usrTyp User type flag.
+ * @param typeId Type ID.
+ * @param hashCode Hash code.
+ * @param clsName Class name (optional).
+ * @return Position where length should be written.
+ */
+ public static int writeHeader(PortableWriterExImpl writer, boolean usrTyp, int typeId, int hashCode,
+ @Nullable String clsName) {
+ writer.doWriteByte(GridPortableMarshaller.OBJ);
+ writer.doWriteByte(GridPortableMarshaller.PROTO_VER);
+
+ PortableUtils.writeFlags(writer, usrTyp);
+
+ writer.doWriteInt(typeId);
+ writer.doWriteInt(hashCode);
+
+ int reserved = writer.reserve(12);
+
+ if (clsName != null)
+ writer.doWriteString(clsName);
+
+ return reserved;
+ }
+
+ /**
+ * Get portable object length.
+ *
+ * @param in Input stream.
+ * @param start Start position.
+ * @return Length.
+ */
+ public static int length(PortablePositionReadable in, int start) {
+ return in.readIntPositioned(start + GridPortableMarshaller.TOTAL_LEN_POS);
+ }
+
+ /**
+ * Get footer start of the object.
+ *
+ * @param in Input stream.
+ * @param start Object start position inside the stream.
+ * @return Footer start.
+ */
+ public static int footerStartRelative(PortablePositionReadable in, int start) {
+ short flags = in.readShortPositioned(start + GridPortableMarshaller.FLAGS_POS);
+
+ if (PortableUtils.isRawOnly(flags))
+ // No schema, footer start equals to object end.
+ return length(in, start);
+ else
+ // Schema exists, use offset.
+ return in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS);
+ }
+
+ /**
+ * Get object's footer.
+ *
+ * @param in Input stream.
+ * @param start Start position.
+ * @return Footer start.
+ */
+ public static int footerStartAbsolute(PortablePositionReadable in, int start) {
+ return footerStartRelative(in, start) + start;
+ }
+
+ /**
+ * Get object's footer.
+ *
+ * @param in Input stream.
+ * @param start Start position.
+ * @return Footer.
+ */
+ public static IgniteBiTuple<Integer, Integer> footerAbsolute(PortablePositionReadable in, int start) {
+ int footerStart = footerStartRelative(in, start);
+ int footerEnd = length(in, start);
+
+ // Take in count possible raw offset.
+ if ((((footerEnd - footerStart) >> 2) & 0x1) == 0x1)
+ footerEnd -= 4;
+
+ return F.t(start + footerStart, start + footerEnd);
+ }
+
+ /**
+ * Get raw offset of the object.
+ *
+ * @param in Input stream.
+ * @param start Object start position inside the stream.
+ * @return Raw offset.
+ */
+ public static int rawOffsetAbsolute(PortablePositionReadable in, int start) {
+ int len = length(in, start);
+
+ short flags = in.readShortPositioned(start + GridPortableMarshaller.FLAGS_POS);
+
+ if (PortableUtils.isRawOnly(flags))
+ // No schema, raw offset is located on schema offset position.
+ return start + in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS);
+ else {
+ // Schema exists.
+ int schemaOff = in.readIntPositioned(start + GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS);
+
+ if ((((len - schemaOff) >> 2) & 0x1) == 0x0)
+ // Even amount of records in schema => no raw offset.
+ return start + schemaOff;
+ else
+ // Odd amount of records in schema => raw offset is the very last 4 bytes in object.
+ return start + in.readIntPositioned(start + len - 4);
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java
index 19795ee..227087b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableWriterExImpl.java
@@ -22,6 +22,7 @@ import org.apache.ignite.internal.portable.streams.PortableHeapOutputStream;
import org.apache.ignite.internal.portable.streams.PortableOutputStream;
import org.apache.ignite.internal.util.typedef.internal.A;
import org.apache.ignite.portable.PortableException;
+import org.apache.ignite.portable.PortableIdMapper;
import org.apache.ignite.portable.PortableRawWriter;
import org.apache.ignite.portable.PortableWriter;
import org.jetbrains.annotations.Nullable;
@@ -55,6 +56,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.DOUBLE_ARR;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.ENUM_ARR;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLAGS_POS;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.FLOAT_ARR;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.INT;
@@ -68,7 +70,8 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.OBJ_ARR;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.OPTM_MARSH;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.PORTABLE_OBJ;
-import static org.apache.ignite.internal.portable.GridPortableMarshaller.RAW_DATA_OFF_POS;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.SCHEMA_ID_POS;
+import static org.apache.ignite.internal.portable.GridPortableMarshaller.SCHEMA_OR_RAW_OFF_POS;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.SHORT_ARR;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING;
@@ -90,17 +93,20 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** */
private static final int INIT_CAP = 1024;
- /** */
- private final PortableContext ctx;
+ /** FNV1 hash offset basis. */
+ private static final int FNV1_OFFSET_BASIS = 0x811C9DC5;
- /** */
- private final WriterContext wCtx;
+ /** FNV1 hash prime. */
+ private static final int FNV1_PRIME = 0x01000193;
+
+ /** Thread-local schema. */
+ private static final ThreadLocal<SchemaHolder> SCHEMA = new ThreadLocal<>();
/** */
- private final int start;
+ private final PortableContext ctx;
/** */
- private int mark;
+ private final int start;
/** */
private Class<?> cls;
@@ -108,8 +114,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** */
private int typeId;
- /** */
- private boolean allowFields = true;
+ /** Raw offset position. */
+ private int rawOffPos;
/** */
private boolean metaEnabled;
@@ -117,64 +123,68 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** */
private int metaHashSum;
- /**
- * @param ctx Context.
- * @param off Start offset.
- */
- PortableWriterExImpl(PortableContext ctx, int off) {
- this.ctx = ctx;
+ /** Handles. */
+ private Map<Object, Integer> handles;
- PortableOutputStream out = new PortableHeapOutputStream(off + INIT_CAP);
+ /** Output stream. */
+ private PortableOutputStream out;
- out.position(off);
+ /** Schema. */
+ private SchemaHolder schema;
- wCtx = new WriterContext(out, null);
+ /** Schema ID. */
+ private int schemaId;
- start = off;
- }
+ /** Amount of written fields. */
+ private int fieldCnt;
+
+ /** ID mapper. */
+ private PortableIdMapper idMapper;
/**
* @param ctx Context.
- * @param out Output stream.
- * @param off Start offset.
*/
- PortableWriterExImpl(PortableContext ctx, PortableOutputStream out, int off) {
- this.ctx = ctx;
-
- wCtx = new WriterContext(out, null);
-
- start = off;
+ PortableWriterExImpl(PortableContext ctx) {
+ this(ctx, new PortableHeapOutputStream(INIT_CAP));
}
/**
* @param ctx Context.
- * @param off Start offset.
- * @param typeId Type ID.
+ * @param out Output stream.
*/
- public PortableWriterExImpl(PortableContext ctx, int off, int typeId, boolean metaEnabled) {
- this(ctx, off);
+ PortableWriterExImpl(PortableContext ctx, PortableOutputStream out) {
+ this(ctx, out, new IdentityHashMap<Object, Integer>());
+ }
- this.typeId = typeId;
+ /**
+ * @param ctx Context.
+ * @param out Output stream.
+ * @param handles Handles.
+ */
+ private PortableWriterExImpl(PortableContext ctx, PortableOutputStream out, Map<Object, Integer> handles) {
+ this.ctx = ctx;
+ this.out = out;
+ this.handles = handles;
- this.metaEnabled = metaEnabled;
- }
+ start = out.position();
+ }
/**
* @param ctx Context.
- * @param wCtx Writer context.
+ * @param typeId Type ID.
*/
- private PortableWriterExImpl(PortableContext ctx, WriterContext wCtx) {
- this.ctx = ctx;
- this.wCtx = wCtx;
+ public PortableWriterExImpl(PortableContext ctx, int typeId, boolean metaEnabled) {
+ this(ctx);
- start = wCtx.out.position();
+ this.typeId = typeId;
+ this.metaEnabled = metaEnabled;
}
/**
* Close the writer releasing resources if necessary.
*/
@Override public void close() {
- wCtx.out.close();
+ out.close();
}
/**
@@ -186,20 +196,18 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/**
* @param obj Object.
- * @param detached Detached or not.
* @throws PortableException In case of error.
*/
- void marshal(Object obj, boolean detached) throws PortableException {
- marshal(obj, detached, true);
+ void marshal(Object obj) throws PortableException {
+ marshal(obj, true);
}
/**
* @param obj Object.
- * @param detached Detached or not.
* @param enableReplace Object replacing enabled flag.
* @throws PortableException In case of error.
*/
- void marshal(Object obj, boolean detached, boolean enableReplace) throws PortableException {
+ void marshal(Object obj, boolean enableReplace) throws PortableException {
assert obj != null;
cls = obj.getClass();
@@ -252,7 +260,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
return;
}
- marshal(replacedObj, detached, false);
+ marshal(replacedObj, false);
return;
}
@@ -261,9 +269,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
metaEnabled = ctx.isMetaDataEnabled(typeId);
- if (detached)
- wCtx.resetHandles();
-
desc.write(obj, this);
}
@@ -274,28 +279,29 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
int handle(Object obj) {
assert obj != null;
- return wCtx.handle(obj);
+ Integer h = handles.get(obj);
+
+ if (h != null)
+ return out.position() - h;
+ else {
+ handles.put(obj, out.position());
+
+ return -1;
+ }
}
/**
* @return Array.
*/
public byte[] array() {
- return wCtx.out.arrayCopy();
- }
-
- /**
- * @return Output stream.
- */
- public PortableOutputStream outputStream() {
- return wCtx.out;
+ return out.arrayCopy();
}
/**
* @return Stream current position.
*/
int position() {
- return wCtx.out.position();
+ return out.position();
}
/**
@@ -304,7 +310,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param pos Position.
*/
void position(int pos) {
- wCtx.out.position(pos);
+ out.position(pos);
}
/**
@@ -312,45 +318,48 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @return Offset.
*/
public int reserve(int bytes) {
- int pos = wCtx.out.position();
+ int pos = out.position();
- wCtx.out.position(pos + bytes);
+ out.position(pos + bytes);
return pos;
}
/**
- * @param bytes Number of bytes to reserve.
- * @return Offset.
+ * Perform post-write activity. This includes:
+ * - writing object length;
+ * - writing schema offset;
+ * - writing schema to the tail.
+ *
+ * @param userType User type flag.
*/
- public int reserveAndMark(int bytes) {
- int off0 = reserve(bytes);
+ public void postWrite(boolean userType) {
+ if (schema != null) {
+ // Write schema ID.
+ out.writeInt(start + SCHEMA_ID_POS, schemaId);
- mark = wCtx.out.position();
+ // Write schema offset.
+ out.writeInt(start + SCHEMA_OR_RAW_OFF_POS, out.position() - start);
- return off0;
- }
+ // Write the schema.
+ schema.writeAndPop(this, fieldCnt);
- /**
- * @param off Offset.
- */
- public void writeDelta(int off) {
- wCtx.out.writeInt(off, wCtx.out.position() - mark);
- }
+ // Write raw offset if needed.
+ if (rawOffPos != 0)
+ out.writeInt(rawOffPos - start);
+ }
+ else {
+ // Write raw-only flag is needed.
+ int flags = (userType ? PortableUtils.FLAG_USR_TYP : 0) | PortableUtils.FLAG_RAW_ONLY;
- /**
- *
- */
- public void writeLength() {
- wCtx.out.writeInt(start + TOTAL_LEN_POS, wCtx.out.position() - start);
- }
+ out.writeShort(start + FLAGS_POS, (short)flags);
- /**
- *
- */
- public void writeRawOffsetIfNeeded() {
- if (allowFields)
- wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start);
+ // If there are no schema, we are free to write raw offset to schema offset.
+ out.writeInt(start + SCHEMA_OR_RAW_OFF_POS, (rawOffPos == 0 ? out.position() : rawOffPos) - start);
+ }
+
+ // 5. Write length.
+ out.writeInt(start + TOTAL_LEN_POS, out.position() - start);
}
/**
@@ -359,7 +368,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
public void write(byte[] val) {
assert val != null;
- wCtx.out.writeByteArray(val);
+ out.writeByteArray(val);
}
/**
@@ -370,63 +379,63 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
public void write(byte[] val, int off, int len) {
assert val != null;
- wCtx.out.write(val, off, len);
+ out.write(val, off, len);
}
/**
* @param val Value.
*/
public void doWriteByte(byte val) {
- wCtx.out.writeByte(val);
+ out.writeByte(val);
}
/**
* @param val Value.
*/
public void doWriteShort(short val) {
- wCtx.out.writeShort(val);
+ out.writeShort(val);
}
/**
* @param val Value.
*/
public void doWriteInt(int val) {
- wCtx.out.writeInt(val);
+ out.writeInt(val);
}
/**
* @param val Value.
*/
public void doWriteLong(long val) {
- wCtx.out.writeLong(val);
+ out.writeLong(val);
}
/**
* @param val Value.
*/
public void doWriteFloat(float val) {
- wCtx.out.writeFloat(val);
+ out.writeFloat(val);
}
/**
* @param val Value.
*/
public void doWriteDouble(double val) {
- wCtx.out.writeDouble(val);
+ out.writeDouble(val);
}
/**
* @param val Value.
*/
public void doWriteChar(char val) {
- wCtx.out.writeChar(val);
+ out.writeChar(val);
}
/**
* @param val Value.
*/
public void doWriteBoolean(boolean val) {
- wCtx.out.writeBoolean(val);
+ out.writeBoolean(val);
}
/**
@@ -443,15 +452,15 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
if (intVal.signum() == -1) {
intVal = intVal.negate();
- wCtx.out.writeInt(val.scale() | 0x80000000);
+ out.writeInt(val.scale() | 0x80000000);
}
else
- wCtx.out.writeInt(val.scale());
+ out.writeInt(val.scale());
byte[] vals = intVal.toByteArray();
- wCtx.out.writeInt(vals.length);
- wCtx.out.writeByteArray(vals);
+ out.writeInt(vals.length);
+ out.writeByteArray(vals);
}
}
@@ -471,7 +480,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteInt(strArr.length);
- wCtx.out.writeByteArray(strArr);
+ out.writeByteArray(strArr);
}
else {
doWriteBoolean(false);
@@ -480,7 +489,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteInt(strArr.length);
- wCtx.out.writeCharArray(strArr);
+ out.writeCharArray(strArr);
}
}
}
@@ -510,36 +519,32 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
}
}
- /**
- * @param ts Timestamp.
- */
- public void doWriteTimestamp(@Nullable Timestamp ts) {
- if (ts== null)
- doWriteByte(NULL);
- else {
- doWriteByte(TIMESTAMP);
- doWriteLong(ts.getTime());
- doWriteInt(ts.getNanos() % 1000000);
- }
- }
+ /**
+ * @param ts Timestamp.
+ */
+ public void doWriteTimestamp(@Nullable Timestamp ts) {
+ if (ts== null)
+ doWriteByte(NULL);
+ else {
+ doWriteByte(TIMESTAMP);
+ doWriteLong(ts.getTime());
+ doWriteInt(ts.getNanos() % 1000000);
+ }
+ }
/**
+ * Write object.
+ *
* @param obj Object.
- * @param detached Detached or not.
* @throws PortableException In case of error.
*/
- public void doWriteObject(@Nullable Object obj, boolean detached) throws PortableException {
+ public void doWriteObject(@Nullable Object obj) throws PortableException {
if (obj == null)
doWriteByte(NULL);
else {
- WriterContext wCtx = detached ? new WriterContext(this.wCtx.out, this.wCtx.handles) : this.wCtx;
-
- PortableWriterExImpl writer = new PortableWriterExImpl(ctx, wCtx);
-
- writer.marshal(obj, detached);
+ PortableWriterExImpl writer = new PortableWriterExImpl(ctx, out, handles);
- if (detached)
- this.wCtx.out = wCtx.out;
+ writer.marshal(obj);
}
}
@@ -556,7 +561,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(BYTE_ARR);
doWriteInt(val.length);
- wCtx.out.writeByteArray(val);
+ out.writeByteArray(val);
}
}
@@ -573,7 +578,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(SHORT_ARR);
doWriteInt(val.length);
- wCtx.out.writeShortArray(val);
+ out.writeShortArray(val);
}
}
@@ -590,7 +595,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(INT_ARR);
doWriteInt(val.length);
- wCtx.out.writeIntArray(val);
+ out.writeIntArray(val);
}
}
@@ -607,7 +612,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(LONG_ARR);
doWriteInt(val.length);
- wCtx.out.writeLongArray(val);
+ out.writeLongArray(val);
}
}
@@ -624,7 +629,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(FLOAT_ARR);
doWriteInt(val.length);
- wCtx.out.writeFloatArray(val);
+ out.writeFloatArray(val);
}
}
@@ -641,7 +646,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(DOUBLE_ARR);
doWriteInt(val.length);
- wCtx.out.writeDoubleArray(val);
+ out.writeDoubleArray(val);
}
}
@@ -658,7 +663,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(CHAR_ARR);
doWriteInt(val.length);
- wCtx.out.writeCharArray(val);
+ out.writeCharArray(val);
}
}
@@ -675,7 +680,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(BOOLEAN_ARR);
doWriteInt(val.length);
- wCtx.out.writeBooleanArray(val);
+ out.writeBooleanArray(val);
}
}
@@ -794,7 +799,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteInt(val.length);
for (Object obj : val)
- doWriteObject(obj, false);
+ doWriteObject(obj);
}
}
@@ -814,7 +819,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(ctx.collectionType(col.getClass()));
for (Object obj : col)
- doWriteObject(obj, false);
+ doWriteObject(obj);
}
}
@@ -834,8 +839,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteByte(ctx.mapType(map.getClass()));
for (Map.Entry<?, ?> e : map.entrySet()) {
- doWriteObject(e.getKey(), false);
- doWriteObject(e.getValue(), false);
+ doWriteObject(e.getKey());
+ doWriteObject(e.getValue());
}
}
}
@@ -852,8 +857,8 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
return;
doWriteByte(MAP_ENTRY);
- doWriteObject(e.getKey(), false);
- doWriteObject(e.getValue(), false);
+ doWriteObject(e.getKey());
+ doWriteObject(e.getValue());
}
}
@@ -939,7 +944,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
doWriteInt(poArr.length);
- wCtx.out.writeByteArray(poArr);
+ out.writeByteArray(poArr);
doWriteInt(po.start());
}
@@ -949,8 +954,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeByteField(@Nullable Byte val) {
- doWriteInt(val != null ? 2 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -963,19 +966,13 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Class.
*/
void writeClassField(@Nullable Class val) {
- int lenPos = reserveAndMark(4);
-
doWriteClass(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeShortField(@Nullable Short val) {
- doWriteInt(val != null ? 3 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -988,8 +985,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeIntField(@Nullable Integer val) {
- doWriteInt(val != null ? 5 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1002,8 +997,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeLongField(@Nullable Long val) {
- doWriteInt(val != null ? 9 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1016,8 +1009,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeFloatField(@Nullable Float val) {
- doWriteInt(val != null ? 5 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1030,8 +1021,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeDoubleField(@Nullable Double val) {
- doWriteInt(val != null ? 9 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1044,8 +1033,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeCharField(@Nullable Character val) {
- doWriteInt(val != null ? 3 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1058,8 +1045,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeBooleanField(@Nullable Boolean val) {
- doWriteInt(val != null ? 2 : 1);
-
if (val == null)
doWriteByte(NULL);
else {
@@ -1072,29 +1057,20 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeDecimalField(@Nullable BigDecimal val) {
- int lenPos = reserveAndMark(4);
-
doWriteDecimal(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeStringField(@Nullable String val) {
- int lenPos = reserveAndMark(4);
-
doWriteString(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeUuidField(@Nullable UUID val) {
- doWriteInt(val != null ? 17 : 1);
doWriteUuid(val);
}
@@ -1102,7 +1078,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeDateField(@Nullable Date val) {
- doWriteInt(val != null ? 9 : 1);
doWriteDate(val);
}
@@ -1110,7 +1085,6 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @param val Value.
*/
void writeTimestampField(@Nullable Timestamp val) {
- doWriteInt(val != null ? 13 : 1);
doWriteTimestamp(val);
}
@@ -1119,154 +1093,98 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writeObjectField(@Nullable Object obj) throws PortableException {
- int lenPos = reserveAndMark(4);
-
- doWriteObject(obj, false);
-
- writeDelta(lenPos);
+ doWriteObject(obj);
}
/**
* @param val Value.
*/
void writeByteArrayField(@Nullable byte[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteByteArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeShortArrayField(@Nullable short[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteShortArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeIntArrayField(@Nullable int[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteIntArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeLongArrayField(@Nullable long[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteLongArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeFloatArrayField(@Nullable float[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteFloatArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeDoubleArrayField(@Nullable double[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteDoubleArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeCharArrayField(@Nullable char[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteCharArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeBooleanArrayField(@Nullable boolean[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteBooleanArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeDecimalArrayField(@Nullable BigDecimal[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteDecimalArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeStringArrayField(@Nullable String[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteStringArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeUuidArrayField(@Nullable UUID[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteUuidArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeDateArrayField(@Nullable Date[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteDateArray(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeTimestampArrayField(@Nullable Timestamp[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteTimestampArray(val);
-
- writeDelta(lenPos);
}
/**
@@ -1274,11 +1192,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writeObjectArrayField(@Nullable Object[] val) throws PortableException {
- int lenPos = reserveAndMark(4);
-
doWriteObjectArray(val);
-
- writeDelta(lenPos);
}
/**
@@ -1286,11 +1200,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writeCollectionField(@Nullable Collection<?> col) throws PortableException {
- int lenPos = reserveAndMark(4);
-
doWriteCollection(col);
-
- writeDelta(lenPos);
}
/**
@@ -1298,11 +1208,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writeMapField(@Nullable Map<?, ?> map) throws PortableException {
- int lenPos = reserveAndMark(4);
-
doWriteMap(map);
-
- writeDelta(lenPos);
}
/**
@@ -1310,33 +1216,21 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writeMapEntryField(@Nullable Map.Entry<?, ?> e) throws PortableException {
- int lenPos = reserveAndMark(4);
-
doWriteMapEntry(e);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeEnumField(@Nullable Enum<?> val) {
- int lenPos = reserveAndMark(4);
-
doWriteEnum(val);
-
- writeDelta(lenPos);
}
/**
* @param val Value.
*/
void writeEnumArrayField(@Nullable Object[] val) {
- int lenPos = reserveAndMark(4);
-
doWriteEnumArray(val);
-
- writeDelta(lenPos);
}
/**
@@ -1344,11 +1238,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @throws PortableException In case of error.
*/
void writePortableObjectField(@Nullable PortableObjectImpl po) throws PortableException {
- int lenPos = reserveAndMark(4);
-
doWritePortableObject(po);
-
- writeDelta(lenPos);
}
/** {@inheritDoc} */
@@ -1502,12 +1392,18 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** {@inheritDoc} */
@Override public void writeObject(@Nullable Object obj) throws PortableException {
- doWriteObject(obj, false);
+ doWriteObject(obj);
}
/** {@inheritDoc} */
@Override public void writeObjectDetached(@Nullable Object obj) throws PortableException {
- doWriteObject(obj, true);
+ if (obj == null)
+ doWriteByte(NULL);
+ else {
+ PortableWriterExImpl writer = new PortableWriterExImpl(ctx, out, new IdentityHashMap<Object, Integer>());
+
+ writer.marshal(obj);
+ }
}
/** {@inheritDoc} */
@@ -1716,21 +1612,19 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** {@inheritDoc} */
@Override public PortableRawWriter rawWriter() {
- if (allowFields) {
- wCtx.out.writeInt(start + RAW_DATA_OFF_POS, wCtx.out.position() - start);
-
- allowFields = false;
- }
+ if (rawOffPos == 0)
+ rawOffPos = out.position();
return this;
}
/** {@inheritDoc} */
@Override public PortableOutputStream out() {
- return wCtx.out;
+ return out;
}
/** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
@Override public void writeBytes(String s) throws IOException {
int len = s.length();
@@ -1741,6 +1635,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
}
/** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
@Override public void writeChars(String s) throws IOException {
int len = s.length();
@@ -1751,28 +1646,29 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
}
/** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
@Override public void writeUTF(String s) throws IOException {
writeString(s);
}
/** {@inheritDoc} */
@Override public void writeByte(int v) throws IOException {
- doWriteByte((byte)v);
+ doWriteByte((byte) v);
}
/** {@inheritDoc} */
@Override public void writeShort(int v) throws IOException {
- doWriteShort((short)v);
+ doWriteShort((short) v);
}
/** {@inheritDoc} */
@Override public void writeChar(int v) throws IOException {
- doWriteChar((char)v);
+ doWriteChar((char) v);
}
/** {@inheritDoc} */
@Override public void write(int b) throws IOException {
- doWriteByte((byte)b);
+ doWriteByte((byte) b);
}
/** {@inheritDoc} */
@@ -1787,7 +1683,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
/** {@inheritDoc} */
@Override public void writeInt(int pos, int val) throws PortableException {
- wCtx.out.writeInt(pos, val);
+ out.writeInt(pos, val);
}
/**
@@ -1797,16 +1693,56 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
private void writeFieldId(String fieldName, byte fieldType) throws PortableException {
A.notNull(fieldName, "fieldName");
- if (!allowFields)
+ if (rawOffPos != 0)
throw new PortableException("Individual field can't be written after raw writer is acquired " +
"via rawWriter() method. Consider fixing serialization logic for class: " + cls.getName());
- int id = ctx.fieldId(typeId, fieldName);
+ if (idMapper == null)
+ idMapper = ctx.userTypeIdMapper(typeId);
+
+ int id = idMapper.fieldId(typeId, fieldName);
+
+ writeFieldId(id);
if (metaEnabled)
metaHashSum = 31 * metaHashSum + (id + fieldType);
+ }
+
+ /**
+ * Write field ID.
+ * @param fieldId Field ID.
+ */
+ public void writeFieldId(int fieldId) {
+ int fieldOff = out.position() - start;
- doWriteInt(id);
+ if (schema == null) {
+ schema = SCHEMA.get();
+
+ if (schema == null) {
+ schema = new SchemaHolder();
+
+ SCHEMA.set(schema);
+ }
+
+ // Initialize offset when the first field is written.
+ schemaId = FNV1_OFFSET_BASIS;
+ }
+
+ // Advance schema hash.
+ int schemaId0 = schemaId ^ (fieldId & 0xFF);
+ schemaId0 = schemaId0 * FNV1_PRIME;
+ schemaId0 = schemaId0 ^ ((fieldId >> 8) & 0xFF);
+ schemaId0 = schemaId0 * FNV1_PRIME;
+ schemaId0 = schemaId0 ^ ((fieldId >> 16) & 0xFF);
+ schemaId0 = schemaId0 * FNV1_PRIME;
+ schemaId0 = schemaId0 ^ ((fieldId >> 24) & 0xFF);
+ schemaId0 = schemaId0 * FNV1_PRIME;
+
+ schemaId = schemaId0;
+
+ schema.push(fieldId, fieldOff);
+
+ fieldCnt++;
}
/**
@@ -1835,7 +1771,7 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
* @return New writer.
*/
public PortableWriterExImpl newWriter(int typeId) {
- PortableWriterExImpl res = new PortableWriterExImpl(ctx, wCtx);
+ PortableWriterExImpl res = new PortableWriterExImpl(ctx, out, handles);
res.typeId = typeId;
@@ -1849,48 +1785,71 @@ public class PortableWriterExImpl implements PortableWriter, PortableRawWriterEx
return ctx;
}
- /** */
- private static class WriterContext {
- /** */
- private Map<Object, Integer> handles = new IdentityHashMap<>();
+ /**
+ * Schema holder.
+ */
+ private static class SchemaHolder {
+ /** Grow step. */
+ private static final int GROW_STEP = 16;
+
+ /** Maximum stable size. */
+ private static final int MAX_SIZE = 256;
- /** Output stream. */
- private PortableOutputStream out;
+ /** Data. */
+ private int[] data;
+
+ /** Index. */
+ private int idx;
/**
* Constructor.
- *
- * @param out Output stream.
- * @param handles Handles.
*/
- private WriterContext(PortableOutputStream out, Map<Object, Integer> handles) {
- this.out = out;
- this.handles = handles == null ? new IdentityHashMap<Object, Integer>() : handles;
+ public SchemaHolder() {
+ data = new int[GROW_STEP];
}
/**
- * @param obj Object.
- * @return Handle.
+ * Push another frame.
+ *
+ * @param id Field ID.
+ * @param off Field offset.
*/
- private int handle(Object obj) {
- assert obj != null;
-
- Integer h = handles.get(obj);
+ public void push(int id, int off) {
+ if (idx == data.length) {
+ int[] data0 = new int[data.length + GROW_STEP];
- if (h != null)
- return out.position() - h;
- else {
- handles.put(obj, out.position());
+ System.arraycopy(data, 0, data0, 0, data.length);
- return -1;
+ data = data0;
}
+
+ data[idx] = id;
+ data[idx + 1] = off;
+
+ idx += 2;
}
/**
+ * Write collected frames and pop them.
*
+ * @param writer Writer.
+ * @param cnt Count.
*/
- private void resetHandles() {
- handles = new IdentityHashMap<>();
+ public void writeAndPop(PortableWriterExImpl writer, int cnt) {
+ int startIdx = idx - cnt * 2;
+
+ assert startIdx >= 0;
+
+ for (int idx0 = startIdx; idx0 < idx;) {
+ writer.writeInt(data[idx0++]);
+ writer.writeInt(data[idx0++]);
+ }
+
+ idx = startIdx;
+
+ // Shrink data array if needed.
+ if (idx == 0 && data.length > MAX_SIZE)
+ data = new int[MAX_SIZE];
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java
index 00fc866..442fc35 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderImpl.java
@@ -17,33 +17,32 @@
package org.apache.ignite.internal.portable.builder;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.Map;
-import java.util.Set;
+import org.apache.ignite.internal.portable.PortableContext;
+import org.apache.ignite.internal.portable.PortableObjectImpl;
+import org.apache.ignite.internal.portable.PortableObjectOffheapImpl;
+import org.apache.ignite.internal.portable.PortableUtils;
+import org.apache.ignite.internal.portable.PortableWriterExImpl;
import org.apache.ignite.internal.processors.cache.portable.CacheObjectPortableProcessorImpl;
import org.apache.ignite.internal.util.GridArgumentCheck;
+import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.portable.PortableBuilder;
import org.apache.ignite.portable.PortableException;
import org.apache.ignite.portable.PortableInvalidClassException;
import org.apache.ignite.portable.PortableMetadata;
import org.apache.ignite.portable.PortableObject;
import org.jetbrains.annotations.Nullable;
-import org.apache.ignite.internal.portable.*;
-import org.apache.ignite.internal.processors.cache.portable.*;
-import org.apache.ignite.internal.util.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.portable.*;
-import static org.apache.ignite.internal.portable.GridPortableMarshaller.CLS_NAME_POS;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
import static org.apache.ignite.internal.portable.GridPortableMarshaller.DFLT_HDR_LEN;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.HASH_CODE_POS;
-import static org.apache.ignite.internal.portable.GridPortableMarshaller.PROTO_VER;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.PROTO_VER_POS;
-import static org.apache.ignite.internal.portable.GridPortableMarshaller.RAW_DATA_OFF_POS;
-import static org.apache.ignite.internal.portable.GridPortableMarshaller.TOTAL_LEN_POS;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.TYPE_ID_POS;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.UNREGISTERED_TYPE_ID;
@@ -143,14 +142,14 @@ public class PortableBuilderImpl implements PortableBuilder {
PortableUtils.checkProtocolVersion(ver);
- int typeId = reader.readIntAbsolute(start + TYPE_ID_POS);
+ int typeId = reader.readIntPositioned(start + TYPE_ID_POS);
ctx = reader.portableContext();
- hashCode = reader.readIntAbsolute(start + HASH_CODE_POS);
+ hashCode = reader.readIntPositioned(start + HASH_CODE_POS);
if (typeId == UNREGISTERED_TYPE_ID) {
int mark = reader.position();
- reader.position(start + CLS_NAME_POS);
+ reader.position(start + DFLT_HDR_LEN);
clsNameToWrite = reader.readString();
@@ -180,7 +179,7 @@ public class PortableBuilderImpl implements PortableBuilder {
/** {@inheritDoc} */
@Override public PortableObject build() {
- try (PortableWriterExImpl writer = new PortableWriterExImpl(ctx, 0, typeId, false)) {
+ try (PortableWriterExImpl writer = new PortableWriterExImpl(ctx, typeId, false)) {
PortableBuilderSerializer serializationCtx = new PortableBuilderSerializer();
@@ -199,17 +198,11 @@ public class PortableBuilderImpl implements PortableBuilder {
* @param serializer Serializer.
*/
void serializeTo(PortableWriterExImpl writer, PortableBuilderSerializer serializer) {
- writer.doWriteByte(GridPortableMarshaller.OBJ);
- writer.doWriteByte(PROTO_VER);
- writer.doWriteBoolean(true);
- writer.doWriteInt(registeredType ? typeId : UNREGISTERED_TYPE_ID);
- writer.doWriteInt(hashCode);
-
- // Length and raw offset.
- writer.reserve(8);
-
- if (!registeredType)
- writer.writeString(clsNameToWrite);
+ PortableUtils.writeHeader(writer,
+ true,
+ registeredType ? typeId : UNREGISTERED_TYPE_ID,
+ hashCode,
+ registeredType ? null : clsNameToWrite);
Set<Integer> remainsFlds = null;
@@ -230,84 +223,68 @@ public class PortableBuilderImpl implements PortableBuilder {
else
assignedFldsById = Collections.emptyMap();
- int rawOff = start + reader.readIntAbsolute(start + RAW_DATA_OFF_POS);
-
- reader.position(start + hdrLen);
+ // Get footer details.
+ IgniteBiTuple<Integer, Integer> footer = PortableUtils.footerAbsolute(reader, start);
- int cpStart = -1;
+ int footerPos = footer.get1();
+ int footerEnd = footer.get2();
- while (reader.position() < rawOff) {
- int fldId = reader.readInt();
+ // Get raw position.
+ int rawPos = PortableUtils.rawOffsetAbsolute(reader, start);
- int len = reader.readInt();
+ // Position reader on data.
+ reader.position(start + hdrLen);
- if (assignedFldsById.containsKey(fldId)) {
- if (cpStart >= 0) {
- writer.write(reader.array(), cpStart, reader.position() - 4 - 4 - cpStart);
+ while (reader.position() < rawPos) {
+ int fieldId = reader.readIntPositioned(footerPos);
+ int fieldLen = fieldPositionAndLength(footerPos, footerEnd, rawPos).get2();
- cpStart = -1;
- }
+ footerPos += 8;
- Object assignedVal = assignedFldsById.remove(fldId);
+ if (assignedFldsById.containsKey(fieldId)) {
+ Object assignedVal = assignedFldsById.remove(fieldId);
- reader.skip(len);
+ reader.skip(fieldLen);
if (assignedVal != REMOVED_FIELD_MARKER) {
- writer.writeInt(fldId);
-
- int lenPos = writer.reserveAndMark(4);
+ writer.writeFieldId(fieldId);
serializer.writeValue(writer, assignedVal);
-
- writer.writeDelta(lenPos);
}
}
else {
- int type = len != 0 ? reader.readByte(0) : 0;
+ int type = fieldLen != 0 ? reader.readByte(0) : 0;
- if (len != 0 && !PortableUtils.isPlainArrayType(type) && PortableUtils.isPlainType(type)) {
- if (cpStart < 0)
- cpStart = reader.position() - 4 - 4;
+ if (fieldLen != 0 && !PortableUtils.isPlainArrayType(type) && PortableUtils.isPlainType(type)) {
+ writer.writeFieldId(fieldId);
+ writer.write(reader.array(), reader.position(), fieldLen);
- reader.skip(len);
+ reader.skip(fieldLen);
}
else {
- if (cpStart >= 0) {
- writer.write(reader.array(), cpStart, reader.position() - 4 - cpStart);
-
- cpStart = -1;
- }
- else
- writer.writeInt(fldId);
+ writer.writeFieldId(fieldId);
Object val;
- if (len == 0)
+ if (fieldLen == 0)
val = null;
else if (readCache == null) {
int savedPos = reader.position();
val = reader.parseValue();
- assert reader.position() == savedPos + len;
+ assert reader.position() == savedPos + fieldLen;
}
else {
- val = readCache.get(fldId);
+ val = readCache.get(fieldId);
- reader.skip(len);
+ reader.skip(fieldLen);
}
- int lenPos = writer.reserveAndMark(4);
-
serializer.writeValue(writer, val);
-
- writer.writeDelta(lenPos);
}
}
}
-
- if (cpStart >= 0)
- writer.write(reader.array(), cpStart, reader.position() - cpStart);
}
if (assignedVals != null && (remainsFlds == null || !remainsFlds.isEmpty())) {
@@ -333,14 +310,10 @@ public class PortableBuilderImpl implements PortableBuilder {
if (remainsFlds != null && !remainsFlds.contains(fldId))
continue;
- writer.writeInt(fldId);
-
- int lenPos = writer.reserveAndMark(4);
+ writer.writeFieldId(fldId);
serializer.writeValue(writer, val);
- writer.writeDelta(lenPos);
-
if (metadataEnabled) {
String oldFldTypeName = metadata == null ? null : metadata.fieldTypeName(name);
@@ -387,17 +360,22 @@ public class PortableBuilderImpl implements PortableBuilder {
}
}
- writer.writeRawOffsetIfNeeded();
-
if (reader != null) {
- int rawOff = reader.readIntAbsolute(start + RAW_DATA_OFF_POS);
- int len = reader.readIntAbsolute(start + TOTAL_LEN_POS);
+ // Write raw data if any.
+ int rawOff = PortableUtils.rawOffsetAbsolute(reader, start);
+ int footerStart = PortableUtils.footerStartAbsolute(reader, start);
+
+ if (rawOff < footerStart) {
+ writer.rawWriter();
+
+ writer.write(reader.array(), rawOff, footerStart - rawOff);
+ }
- if (rawOff < len)
- writer.write(reader.array(), rawOff, len - rawOff);
+ // Shift reader to the end of the object.
+ reader.position(start + PortableUtils.length(reader, start));
}
- writer.writeLength();
+ writer.postWrite(true);
}
/** {@inheritDoc} */
@@ -408,29 +386,58 @@ public class PortableBuilderImpl implements PortableBuilder {
}
/**
+ * Get field position and length.
*
+ * @param footerPos Field position inside the footer (absolute).
+ * @param footerEnd Footer end (absolute).
+ * @param rawPos Raw data position (absolute).
+ * @return Tuple with field position and length.
+ */
+ private IgniteBiTuple<Integer, Integer> fieldPositionAndLength(int footerPos, int footerEnd, int rawPos) {
+ int fieldOffset = reader.readIntPositioned(footerPos + 4);
+ int fieldPos = start + fieldOffset;
+
+ // Get field length.
+ int fieldLen;
+
+ if (footerPos + 8 == footerEnd)
+ // This is the last field, compare to raw offset.
+ fieldLen = rawPos - fieldPos;
+ else {
+ // Field is somewhere in the middle, get difference with the next offset.
+ int nextFieldOffset = reader.readIntPositioned(footerPos + 8 + 4);
+
+ fieldLen = nextFieldOffset - fieldOffset;
+ }
+
+ return F.t(fieldPos, fieldLen);
+ }
+
+ /**
+ * Initialize read cache if needed.
*/
private void ensureReadCacheInit() {
if (readCache == null) {
Map<Integer, Object> readCache = new HashMap<>();
- int pos = start + hdrLen;
- int end = start + reader.readIntAbsolute(start + RAW_DATA_OFF_POS);
+ IgniteBiTuple<Integer, Integer> footer = PortableUtils.footerAbsolute(reader, start);
- while (pos < end) {
- int fieldId = reader.readIntAbsolute(pos);
+ int footerPos = footer.get1();
+ int footerEnd = footer.get2();
- pos += 4;
+ int rawPos = PortableUtils.rawOffsetAbsolute(reader, start);
- int len = reader.readIntAbsolute(pos);
+ while (footerPos < footerEnd) {
+ int fieldId = reader.readIntPositioned(footerPos);
- pos += 4;
+ IgniteBiTuple<Integer, Integer> posAndLen = fieldPositionAndLength(footerPos, footerEnd, rawPos);
- Object val = reader.getValueQuickly(pos, len);
+ Object val = reader.getValueQuickly(posAndLen.get1(), posAndLen.get2());
readCache.put(fieldId, val);
- pos += len;
+ // Shift current footer position.
+ footerPos += 8;
}
this.readCache = readCache;
@@ -438,7 +445,8 @@ public class PortableBuilderImpl implements PortableBuilder {
}
/** {@inheritDoc} */
- @Override public <F> F getField(String name) {
+ @SuppressWarnings("unchecked")
+ @Override public <T> T getField(String name) {
Object val;
if (assignedVals != null && assignedVals.containsKey(name)) {
@@ -455,7 +463,7 @@ public class PortableBuilderImpl implements PortableBuilder {
val = readCache.get(fldId);
}
- return (F)PortableUtils.unwrapLazy(val);
+ return (T)PortableUtils.unwrapLazy(val);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
index afa40a3..b999cde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
@@ -24,6 +24,7 @@ import java.util.Map;
import org.apache.ignite.internal.portable.GridPortableMarshaller;
import org.apache.ignite.internal.portable.PortableContext;
import org.apache.ignite.internal.portable.PortableObjectImpl;
+import org.apache.ignite.internal.portable.PortablePositionReadable;
import org.apache.ignite.internal.portable.PortablePrimitives;
import org.apache.ignite.internal.portable.PortableReaderExImpl;
import org.apache.ignite.internal.portable.PortableUtils;
@@ -37,7 +38,7 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING;
/**
*
*/
-class PortableBuilderReader {
+public class PortableBuilderReader implements PortablePositionReadable {
/** */
private static final PortablePrimitives PRIM = PortablePrimitives.get();
@@ -130,11 +131,13 @@ class PortableBuilderReader {
return PRIM.readByte(arr, pos);
}
- /**
- * @param pos Position in the source array.
- * @return Read int value.
- */
- public int readIntAbsolute(int pos) {
+ /** {@inheritDoc} */
+ @Override public short readShortPositioned(int pos) {
+ return PRIM.readShort(arr, pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readIntPositioned(int pos) {
return PRIM.readInt(arr, pos);
}
@@ -357,7 +360,7 @@ class PortableBuilderReader {
return null;
case GridPortableMarshaller.HANDLE: {
- int objStart = pos - readIntAbsolute(pos + 1);
+ int objStart = pos - readIntPositioned(pos + 1);
PortableBuilderImpl res = objMap.get(objStart);
@@ -451,9 +454,9 @@ class PortableBuilderReader {
}
case GridPortableMarshaller.PORTABLE_OBJ: {
- int size = readIntAbsolute(pos + 1);
+ int size = readIntPositioned(pos + 1);
- int start = readIntAbsolute(pos + 4 + size);
+ int start = readIntPositioned(pos + 4 + size);
PortableObjectImpl portableObj = new PortableObjectImpl(ctx, arr, pos + 4 + start);
@@ -747,7 +750,6 @@ class PortableBuilderReader {
return new PortablePlainPortableObject(portableObj);
}
-
default:
throw new PortableException("Invalid flag value: " + type);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java
index 2d9c961..fa08d79 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderSerializer.java
@@ -82,12 +82,12 @@ class PortableBuilderSerializer {
Integer posInResArr = objToPos.get(obj);
if (posInResArr == null) {
- objToPos.put(obj, writer.outputStream().position());
+ objToPos.put(obj, writer.out().position());
obj.serializeTo(writer.newWriter(obj.typeId()), this);
}
else {
- int handle = writer.outputStream().position() - posInResArr;
+ int handle = writer.out().position() - posInResArr;
writer.writeByte(GridPortableMarshaller.HANDLE);
writer.writeInt(handle);
@@ -177,7 +177,7 @@ class PortableBuilderSerializer {
return;
}
- writer.doWriteObject(val, false);
+ writer.doWriteObject(val);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java
index a08cfdd..f29872e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyArrayList.java
@@ -56,7 +56,7 @@ class PortableLazyArrayList extends AbstractList<Object> implements PortableBuil
*/
private void ensureDelegateInit() {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */);
@@ -125,7 +125,7 @@ class PortableLazyArrayList extends AbstractList<Object> implements PortableBuil
/** {@inheritDoc} */
@Override public int size() {
if (delegate == null)
- return reader.readIntAbsolute(off + 1);
+ return reader.readIntPositioned(off + 1);
return delegate.size();
}
@@ -133,7 +133,7 @@ class PortableLazyArrayList extends AbstractList<Object> implements PortableBuil
/** {@inheritDoc} */
@Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java
index f793d7a..4940311 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyLinkedList.java
@@ -58,7 +58,7 @@ class PortableLazyLinkedList extends AbstractList<Object> implements PortableBui
*/
private void ensureDelegateInit() {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */);
@@ -129,7 +129,7 @@ class PortableLazyLinkedList extends AbstractList<Object> implements PortableBui
/** {@inheritDoc} */
@Override public int size() {
if (delegate == null)
- return reader.readIntAbsolute(off + 1);
+ return reader.readIntPositioned(off + 1);
return delegate.size();
}
@@ -190,7 +190,7 @@ class PortableLazyLinkedList extends AbstractList<Object> implements PortableBui
/** {@inheritDoc} */
@Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */;
writer.write(reader.array(), off, hdrSize);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java
index 12cbfd6..74bd4c4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazyMap.java
@@ -73,7 +73,7 @@ class PortableLazyMap extends AbstractMap<Object, Object> implements PortableBui
*/
private void ensureDelegateInit() {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */);
@@ -87,7 +87,7 @@ class PortableLazyMap extends AbstractMap<Object, Object> implements PortableBui
/** {@inheritDoc} */
@Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) {
if (delegate == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */;
writer.write(reader.array(), off, hdrSize);
@@ -117,7 +117,7 @@ class PortableLazyMap extends AbstractMap<Object, Object> implements PortableBui
/** {@inheritDoc} */
@Override public int size() {
if (delegate == null)
- return reader.readIntAbsolute(off + 1);
+ return reader.readIntPositioned(off + 1);
return delegate.size();
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java
index 16772af..c1099eb 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableLazySet.java
@@ -49,7 +49,7 @@ class PortableLazySet extends PortableAbstractLazyValue {
/** {@inheritDoc} */
@Override public void writeTo(PortableWriterExImpl writer, PortableBuilderSerializer ctx) {
if (val == null) {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
int hdrSize = 1 /* flag */ + 4 /* size */ + 1 /* col type */;
writer.write(reader.array(), off, hdrSize);
@@ -78,7 +78,7 @@ class PortableLazySet extends PortableAbstractLazyValue {
/** {@inheritDoc} */
@Override protected Object init() {
- int size = reader.readIntAbsolute(off + 1);
+ int size = reader.readIntPositioned(off + 1);
reader.position(off + 1/* flag */ + 4/* size */ + 1/* col type */);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
index 107b02e..7dbee92 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractInputStream.java
@@ -158,13 +158,23 @@ public abstract class PortableAbstractInputStream extends PortableAbstractStream
}
/** {@inheritDoc} */
- @Override public int readInt(int pos) {
+ @Override public short readShortPositioned(int pos) {
+ int delta = pos + 2 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return readShortPositioned0(pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readIntPositioned(int pos) {
int delta = pos + 4 - this.pos;
if (delta > 0)
ensureEnoughData(delta);
- return readIntPositioned(pos);
+ return readIntPositioned0(pos);
}
/** {@inheritDoc} */
@@ -334,10 +344,18 @@ public abstract class PortableAbstractInputStream extends PortableAbstractStream
protected abstract long readLongFast();
/**
+ * Internal routine for positioned short value read.
+ *
+ * @param pos Position.
+ * @return Int value.
+ */
+ protected abstract short readShortPositioned0(int pos);
+
+ /**
* Internal routine for positioned int value read.
*
* @param pos Position.
* @return Int value.
*/
- protected abstract int readIntPositioned(int pos);
+ protected abstract int readIntPositioned0(int pos);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
index 78f46ca..c943682 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
@@ -120,6 +120,13 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea
}
/** {@inheritDoc} */
+ @Override public void writeShort(int pos, short val) {
+ ensureCapacity(pos + 2);
+
+ writeShortPositioned(pos, val);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeInt(int pos, int val) {
ensureCapacity(pos + 4);
@@ -307,6 +314,14 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea
protected abstract void writeLongFast(long val);
/**
+ * Write short value to the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ protected abstract void writeShortPositioned(int pos, short val);
+
+ /**
* Write int value to the given position.
*
* @param pos Position.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
index c7ec576..adfeaad 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
@@ -123,7 +123,17 @@ public final class PortableHeapInputStream extends PortableAbstractInputStream {
}
/** {@inheritDoc} */
- @Override protected int readIntPositioned(int pos) {
+ @Override protected short readShortPositioned0(int pos) {
+ short res = UNSAFE.getShort(data, BYTE_ARR_OFF + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned0(int pos) {
int res = UNSAFE.getInt(data, BYTE_ARR_OFF + pos);
if (!LITTLE_ENDIAN)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
index 2abb69c..208ad33 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -147,6 +147,14 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream
}
/** {@inheritDoc} */
+ @Override protected void writeShortPositioned(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
@Override protected void writeIntPositioned(int pos, int val) {
if (!LITTLE_ENDIAN)
val = Integer.reverseBytes(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
index a1d7fd5..a2273d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableInputStream.java
@@ -17,10 +17,12 @@
package org.apache.ignite.internal.portable.streams;
+import org.apache.ignite.internal.portable.PortablePositionReadable;
+
/**
* Portable input stream.
*/
-public interface PortableInputStream extends PortableStream {
+public interface PortableInputStream extends PortableStream, PortablePositionReadable {
/**
* Read byte value.
*
@@ -99,14 +101,6 @@ public interface PortableInputStream extends PortableStream {
public int readInt();
/**
- * Read int value at the given position.
- *
- * @param pos Position.
- * @return Value.
- */
- public int readInt(int pos);
-
- /**
* Read int array.
*
* @param cnt Expected item count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
index f5ecf95..75bffb9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapInputStream.java
@@ -113,7 +113,17 @@ public class PortableOffheapInputStream extends PortableAbstractInputStream {
}
/** {@inheritDoc} */
- @Override protected int readIntPositioned(int pos) {
+ @Override protected short readShortPositioned0(int pos) {
+ short res = UNSAFE.getShort(ptr + pos);
+
+ if (!LITTLE_ENDIAN)
+ res = Short.reverseBytes(res);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int readIntPositioned0(int pos) {
int res = UNSAFE.getInt(ptr + pos);
if (!LITTLE_ENDIAN)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
index 0deef90..430a176 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
@@ -125,6 +125,14 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream {
}
/** {@inheritDoc} */
+ @Override protected void writeShortPositioned(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
@Override protected void writeIntPositioned(int pos, int val) {
if (!LITTLE_ENDIAN)
val = Integer.reverseBytes(val);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
index 745f9ee..0e25b12 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
@@ -85,6 +85,14 @@ public interface PortableOutputStream extends PortableStream, AutoCloseable {
public void writeInt(int val);
/**
+ * Write short value at the given position.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void writeShort(int pos, short val);
+
+ /**
* Write int value to the given position.
*
* @param pos Position.
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
index 1597d39..04c1e69 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectPortableProcessorImpl.java
@@ -430,7 +430,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
* @throws PortableException If failed.
*/
public byte[] marshal(@Nullable Object obj) throws PortableException {
- byte[] arr = portableMarsh.marshal(obj, 0);
+ byte[] arr = portableMarsh.marshal(obj);
assert arr.length > 0;
@@ -515,7 +515,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
return new GridMapEntry<>(marshalToPortable(e.getKey()), marshalToPortable(e.getValue()));
}
- byte[] arr = portableMarsh.marshal(obj, 0);
+ byte[] arr = portableMarsh.marshal(obj);
assert arr.length > 0;
@@ -721,7 +721,7 @@ public class CacheObjectPortableProcessorImpl extends IgniteCacheObjectProcessor
if (!((CacheObjectPortableContext)ctx).portableEnabled() || portableMarsh == null)
return super.marshal(ctx, val);
- byte[] arr = portableMarsh.marshal(val, 0);
+ byte[] arr = portableMarsh.marshal(val);
assert arr.length > 0;
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
index b54b151..a4d711e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianInputStreamImpl.java
@@ -65,8 +65,13 @@ public class PlatformBigEndianInputStreamImpl extends PlatformInputStreamImpl {
}
/** {@inheritDoc} */
- @Override public int readInt(int pos) {
- return Integer.reverseBytes(super.readInt(pos));
+ @Override public short readShortPositioned(int pos) {
+ return Short.reverseBytes(super.readShortPositioned(pos));
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readIntPositioned(int pos) {
+ return Integer.reverseBytes(super.readIntPositioned(pos));
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
index 0f6ccbc..e5fd71b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformBigEndianOutputStreamImpl.java
@@ -99,6 +99,11 @@ public class PlatformBigEndianOutputStreamImpl extends PlatformOutputStreamImpl
}
/** {@inheritDoc} */
+ @Override public void writeShort(int pos, short val) {
+ super.writeShort(pos, Short.reverseBytes(val));
+ }
+
+ /** {@inheritDoc} */
@Override public void writeInt(int pos, int val) {
super.writeInt(pos, Integer.reverseBytes(val));
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
index 03a166e..68b4141 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformInputStreamImpl.java
@@ -146,7 +146,17 @@ public class PlatformInputStreamImpl implements PlatformInputStream {
}
/** {@inheritDoc} */
- @Override public int readInt(int pos) {
+ @Override public short readShortPositioned(int pos) {
+ int delta = pos + 2 - this.pos;
+
+ if (delta > 0)
+ ensureEnoughData(delta);
+
+ return UNSAFE.getShort(data + pos);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int readIntPositioned(int pos) {
int delta = pos + 4 - this.pos;
if (delta > 0)
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
index 13c3dd3..16b1567 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/memory/PlatformOutputStreamImpl.java
@@ -120,6 +120,13 @@ public class PlatformOutputStreamImpl implements PlatformOutputStream {
}
/** {@inheritDoc} */
+ @Override public void writeShort(int pos, short val) {
+ ensureCapacity(pos + 2);
+
+ UNSAFE.putShort(data + pos, val);
+ }
+
+ /** {@inheritDoc} */
@Override public void writeInt(int pos, int val) {
ensureCapacity(pos + 4);
http://git-wip-us.apache.org/repos/asf/ignite/blob/b85fa171/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
index 649e69d..f9cf509 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
@@ -260,7 +260,7 @@ public class PortableMarshaller extends AbstractMarshaller {
/** {@inheritDoc} */
@Override public byte[] marshal(@Nullable Object obj) throws IgniteCheckedException {
- return impl.marshal(obj, 0);
+ return impl.marshal(obj);
}
/** {@inheritDoc} */