You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/20 07:28:57 UTC
[2/5] ignite git commit: IGNITE-1917: Binary protocol performance
optimizations.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java
index 8543ce6..3edf980 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableClassDescriptor.java
@@ -69,7 +69,7 @@ public class PortableClassDescriptor {
private final BinaryIdMapper idMapper;
/** */
- private final Mode mode;
+ private final BinaryWriteMode mode;
/** */
private final boolean userType;
@@ -87,7 +87,7 @@ public class PortableClassDescriptor {
private final Constructor<?> ctor;
/** */
- private final Collection<FieldInfo> fields;
+ private final BinaryFieldAccessor[] fields;
/** */
private final Method writeReplaceMtd;
@@ -99,7 +99,7 @@ public class PortableClassDescriptor {
private final Map<String, Integer> stableFieldsMeta;
/** Object schemas. Initialized only for serializable classes and contains only 1 entry. */
- private final Collection<PortableSchema> stableSchemas;
+ private final PortableSchema stableSchema;
/** Schema registry. */
private final PortableSchemaRegistry schemaReg;
@@ -167,9 +167,9 @@ public class PortableClassDescriptor {
useOptMarshaller = !predefined && initUseOptimizedMarshallerFlag();
if (excluded)
- mode = Mode.EXCLUSION;
+ mode = BinaryWriteMode.EXCLUSION;
else
- mode = serializer != null ? Mode.PORTABLE : mode(cls);
+ mode = serializer != null ? BinaryWriteMode.PORTABLE : PortableUtils.mode(cls);
switch (mode) {
case BYTE:
@@ -210,7 +210,7 @@ public class PortableClassDescriptor {
ctor = null;
fields = null;
stableFieldsMeta = null;
- stableSchemas = null;
+ stableSchema = null;
break;
@@ -219,13 +219,13 @@ public class PortableClassDescriptor {
ctor = constructor(cls);
fields = null;
stableFieldsMeta = null;
- stableSchemas = null;
+ stableSchema = null;
break;
case OBJECT:
ctor = constructor(cls);
- fields = new ArrayList<>();
+ ArrayList<BinaryFieldAccessor> fields0 = new ArrayList<>();
stableFieldsMeta = metaDataEnabled ? new HashMap<String, Integer>() : null;
PortableSchema.Builder schemaBuilder = PortableSchema.Builder.newBuilder();
@@ -250,20 +250,22 @@ public class PortableClassDescriptor {
if (!ids.add(fieldId))
throw new BinaryObjectException("Duplicate field ID: " + name);
- FieldInfo fieldInfo = new FieldInfo(f, fieldId);
+ BinaryFieldAccessor fieldInfo = BinaryFieldAccessor.create(f, fieldId);
- fields.add(fieldInfo);
+ fields0.add(fieldInfo);
schemaBuilder.addField(fieldId);
if (metaDataEnabled)
- stableFieldsMeta.put(name, fieldInfo.fieldMode().typeId());
+ stableFieldsMeta.put(name, fieldInfo.mode().typeId());
}
}
}
-
- stableSchemas = Collections.singleton(schemaBuilder.build());
-
+
+ fields = fields0.toArray(new BinaryFieldAccessor[fields0.size()]);
+
+ stableSchema = schemaBuilder.build();
+
break;
default:
@@ -271,7 +273,8 @@ public class PortableClassDescriptor {
throw new BinaryObjectException("Invalid mode: " + mode);
}
- if (mode == Mode.PORTABLE || mode == Mode.EXTERNALIZABLE || mode == Mode.OBJECT) {
+ if (mode == BinaryWriteMode.PORTABLE || mode == BinaryWriteMode.EXTERNALIZABLE ||
+ mode == BinaryWriteMode.OBJECT) {
readResolveMtd = U.findNonPublicMethod(cls, "readResolve");
writeReplaceMtd = U.findNonPublicMethod(cls, "writeReplace");
}
@@ -310,10 +313,10 @@ public class PortableClassDescriptor {
}
/**
- * @return Schemas.
+ * @return Schema.
*/
- Collection<PortableSchema> schemas() {
- return stableSchemas;
+ PortableSchema schema() {
+ return stableSchema;
}
/**
@@ -380,52 +383,46 @@ public class PortableClassDescriptor {
assert obj != null;
assert writer != null;
+ writer.typeId(typeId);
+
switch (mode) {
case BYTE:
- writer.doWriteByte(GridPortableMarshaller.BYTE);
- writer.doWriteByte((byte)obj);
+ writer.writeByteFieldPrimitive((byte) obj);
break;
case SHORT:
- writer.doWriteByte(GridPortableMarshaller.SHORT);
- writer.doWriteShort((short)obj);
+ writer.writeShortFieldPrimitive((short)obj);
break;
case INT:
- writer.doWriteByte(GridPortableMarshaller.INT);
- writer.doWriteInt((int)obj);
+ writer.writeIntFieldPrimitive((int) obj);
break;
case LONG:
- writer.doWriteByte(GridPortableMarshaller.LONG);
- writer.doWriteLong((long)obj);
+ writer.writeLongFieldPrimitive((long) obj);
break;
case FLOAT:
- writer.doWriteByte(GridPortableMarshaller.FLOAT);
- writer.doWriteFloat((float)obj);
+ writer.writeFloatFieldPrimitive((float) obj);
break;
case DOUBLE:
- writer.doWriteByte(GridPortableMarshaller.DOUBLE);
- writer.doWriteDouble((double)obj);
+ writer.writeDoubleFieldPrimitive((double) obj);
break;
case CHAR:
- writer.doWriteByte(GridPortableMarshaller.CHAR);
- writer.doWriteChar((char)obj);
+ writer.writeCharFieldPrimitive((char) obj);
break;
case BOOLEAN:
- writer.doWriteByte(GridPortableMarshaller.BOOLEAN);
- writer.doWriteBoolean((boolean)obj);
+ writer.writeBooleanFieldPrimitive((boolean) obj);
break;
@@ -623,9 +620,11 @@ public class PortableClassDescriptor {
case OBJECT:
if (writeHeader(obj, writer)) {
try {
- for (FieldInfo info : fields)
+ for (BinaryFieldAccessor info : fields)
info.write(obj, writer);
+ writer.schemaId(stableSchema.schemaId());
+
writer.postWrite(userType);
}
finally {
@@ -683,7 +682,7 @@ public class PortableClassDescriptor {
reader.setHandler(res);
- for (FieldInfo info : fields)
+ for (BinaryFieldAccessor info : fields)
info.read(res, reader);
break;
@@ -723,12 +722,22 @@ public class PortableClassDescriptor {
if (writer.tryWriteAsHandle(obj))
return false;
- PortableUtils.writeHeader(
- writer,
- registered ? typeId : GridPortableMarshaller.UNREGISTERED_TYPE_ID,
- obj instanceof CacheObjectImpl ? 0 : obj.hashCode(),
- registered ? null : cls.getName()
- );
+ if (registered) {
+ PortableUtils.writeHeader(
+ writer,
+ typeId,
+ obj instanceof CacheObjectImpl ? 0 : obj.hashCode(),
+ null
+ );
+ }
+ else {
+ PortableUtils.writeHeader(
+ writer,
+ GridPortableMarshaller.UNREGISTERED_TYPE_ID,
+ obj instanceof CacheObjectImpl ? 0 : obj.hashCode(),
+ cls.getName()
+ );
+ }
return true;
}
@@ -794,658 +803,4 @@ public class PortableClassDescriptor {
return use;
}
-
- /**
- * @param cls Class.
- * @return Mode.
- */
- @SuppressWarnings("IfMayBeConditional")
- private static Mode mode(Class<?> cls) {
- assert cls != null;
-
- if (cls == byte.class || cls == Byte.class)
- return Mode.BYTE;
- else if (cls == short.class || cls == Short.class)
- return Mode.SHORT;
- else if (cls == int.class || cls == Integer.class)
- return Mode.INT;
- else if (cls == long.class || cls == Long.class)
- return Mode.LONG;
- else if (cls == float.class || cls == Float.class)
- return Mode.FLOAT;
- else if (cls == double.class || cls == Double.class)
- return Mode.DOUBLE;
- else if (cls == char.class || cls == Character.class)
- return Mode.CHAR;
- else if (cls == boolean.class || cls == Boolean.class)
- return Mode.BOOLEAN;
- else if (cls == BigDecimal.class)
- return Mode.DECIMAL;
- else if (cls == String.class)
- return Mode.STRING;
- else if (cls == UUID.class)
- return Mode.UUID;
- else if (cls == Date.class)
- return Mode.DATE;
- else if (cls == Timestamp.class)
- return Mode.TIMESTAMP;
- else if (cls == byte[].class)
- return Mode.BYTE_ARR;
- else if (cls == short[].class)
- return Mode.SHORT_ARR;
- else if (cls == int[].class)
- return Mode.INT_ARR;
- else if (cls == long[].class)
- return Mode.LONG_ARR;
- else if (cls == float[].class)
- return Mode.FLOAT_ARR;
- else if (cls == double[].class)
- return Mode.DOUBLE_ARR;
- else if (cls == char[].class)
- return Mode.CHAR_ARR;
- else if (cls == boolean[].class)
- return Mode.BOOLEAN_ARR;
- else if (cls == BigDecimal[].class)
- return Mode.DECIMAL_ARR;
- else if (cls == String[].class)
- return Mode.STRING_ARR;
- else if (cls == UUID[].class)
- return Mode.UUID_ARR;
- else if (cls == Date[].class)
- return Mode.DATE_ARR;
- else if (cls == Timestamp[].class)
- return Mode.TIMESTAMP_ARR;
- else if (cls.isArray())
- return cls.getComponentType().isEnum() ? Mode.ENUM_ARR : Mode.OBJECT_ARR;
- else if (cls == BinaryObjectImpl.class)
- return Mode.PORTABLE_OBJ;
- else if (Binarylizable.class.isAssignableFrom(cls))
- return Mode.PORTABLE;
- else if (Externalizable.class.isAssignableFrom(cls))
- return Mode.EXTERNALIZABLE;
- else if (Map.Entry.class.isAssignableFrom(cls))
- return Mode.MAP_ENTRY;
- else if (Collection.class.isAssignableFrom(cls))
- return Mode.COL;
- else if (Map.class.isAssignableFrom(cls))
- return Mode.MAP;
- else if (cls == BinaryObjectImpl.class)
- return Mode.PORTABLE_OBJ;
- else if (cls.isEnum())
- return Mode.ENUM;
- else if (cls == Class.class)
- return Mode.CLASS;
- else
- return Mode.OBJECT;
- }
-
- /** */
- private static class FieldInfo {
- /** */
- private final Field field;
-
- /** */
- private final int id;
-
- /** */
- private final Mode mode;
-
- /**
- * @param field Field.
- * @param id Field ID.
- */
- private FieldInfo(Field field, int id) {
- assert field != null;
-
- this.field = field;
- this.id = id;
-
- Class<?> type = field.getType();
-
- mode = mode(type);
- }
-
- /**
- * @return Field mode.
- */
- public Mode fieldMode() {
- return mode;
- }
-
- /**
- * @param obj Object.
- * @param writer Writer.
- * @throws BinaryObjectException In case of error.
- */
- public void write(Object obj, BinaryWriterExImpl writer) throws BinaryObjectException {
- assert obj != null;
- assert writer != null;
-
- writer.writeFieldId(id);
-
- Object val;
-
- try {
- val = field.get(obj);
- }
- catch (IllegalAccessException e) {
- throw new BinaryObjectException("Failed to get value for field: " + field, e);
- }
-
- switch (mode) {
- case BYTE:
- writer.writeByteField((Byte)val);
-
- break;
-
- case SHORT:
- writer.writeShortField((Short)val);
-
- break;
-
- case INT:
- writer.writeIntField((Integer)val);
-
- break;
-
- case LONG:
- writer.writeLongField((Long)val);
-
- break;
-
- case FLOAT:
- writer.writeFloatField((Float)val);
-
- break;
-
- case DOUBLE:
- writer.writeDoubleField((Double)val);
-
- break;
-
- case CHAR:
- writer.writeCharField((Character)val);
-
- break;
-
- case BOOLEAN:
- writer.writeBooleanField((Boolean)val);
-
- break;
-
- case DECIMAL:
- writer.writeDecimalField((BigDecimal)val);
-
- break;
-
- case STRING:
- writer.writeStringField((String)val);
-
- break;
-
- case UUID:
- writer.writeUuidField((UUID)val);
-
- break;
-
- case DATE:
- writer.writeDateField((Date)val);
-
- break;
-
- case TIMESTAMP:
- writer.writeTimestampField((Timestamp)val);
-
- break;
-
- case BYTE_ARR:
- writer.writeByteArrayField((byte[])val);
-
- break;
-
- case SHORT_ARR:
- writer.writeShortArrayField((short[]) val);
-
- break;
-
- case INT_ARR:
- writer.writeIntArrayField((int[]) val);
-
- break;
-
- case LONG_ARR:
- writer.writeLongArrayField((long[]) val);
-
- break;
-
- case FLOAT_ARR:
- writer.writeFloatArrayField((float[]) val);
-
- break;
-
- case DOUBLE_ARR:
- writer.writeDoubleArrayField((double[]) val);
-
- break;
-
- case CHAR_ARR:
- writer.writeCharArrayField((char[]) val);
-
- break;
-
- case BOOLEAN_ARR:
- writer.writeBooleanArrayField((boolean[]) val);
-
- break;
-
- case DECIMAL_ARR:
- writer.writeDecimalArrayField((BigDecimal[]) val);
-
- break;
-
- case STRING_ARR:
- writer.writeStringArrayField((String[]) val);
-
- break;
-
- case UUID_ARR:
- writer.writeUuidArrayField((UUID[]) val);
-
- break;
-
- case DATE_ARR:
- writer.writeDateArrayField((Date[]) val);
-
- break;
-
- case TIMESTAMP_ARR:
- writer.writeTimestampArrayField((Timestamp[]) val);
-
- break;
-
- case OBJECT_ARR:
- writer.writeObjectArrayField((Object[])val);
-
- break;
-
- case COL:
- writer.writeCollectionField((Collection<?>)val);
-
- break;
-
- case MAP:
- writer.writeMapField((Map<?, ?>)val);
-
- break;
-
- case MAP_ENTRY:
- writer.writeMapEntryField((Map.Entry<?, ?>)val);
-
- break;
-
- case PORTABLE_OBJ:
- writer.writePortableObjectField((BinaryObjectImpl)val);
-
- break;
-
- case ENUM:
- writer.writeEnumField((Enum<?>)val);
-
- break;
-
- case ENUM_ARR:
- writer.writeEnumArrayField((Object[])val);
-
- break;
-
- case PORTABLE:
- case EXTERNALIZABLE:
- case OBJECT:
- writer.writeObjectField(val);
-
- break;
-
- case CLASS:
- writer.writeClassField((Class)val);
-
- break;
-
- default:
- assert false : "Invalid mode: " + mode;
- }
- }
-
- /**
- * @param obj Object.
- * @param reader Reader.
- * @throws BinaryObjectException In case of error.
- */
- public void read(Object obj, BinaryReaderExImpl reader) throws BinaryObjectException {
- Object val = null;
-
- switch (mode) {
- case BYTE:
- val = reader.readByte(id);
-
- break;
-
- case SHORT:
- val = reader.readShort(id);
-
- break;
-
- case INT:
- val = reader.readInt(id);
-
- break;
-
- case LONG:
- val = reader.readLong(id);
-
- break;
-
- case FLOAT:
- val = reader.readFloat(id);
-
- break;
-
- case DOUBLE:
- val = reader.readDouble(id);
-
- break;
-
- case CHAR:
- val = reader.readChar(id);
-
- break;
-
- case BOOLEAN:
- val = reader.readBoolean(id);
-
- break;
-
- case DECIMAL:
- val = reader.readDecimal(id);
-
- break;
-
- case STRING:
- val = reader.readString(id);
-
- break;
-
- case UUID:
- val = reader.readUuid(id);
-
- break;
-
- case DATE:
- val = reader.readDate(id);
-
- break;
-
- case TIMESTAMP:
- val = reader.readTimestamp(id);
-
- break;
-
- case BYTE_ARR:
- val = reader.readByteArray(id);
-
- break;
-
- case SHORT_ARR:
- val = reader.readShortArray(id);
-
- break;
-
- case INT_ARR:
- val = reader.readIntArray(id);
-
- break;
-
- case LONG_ARR:
- val = reader.readLongArray(id);
-
- break;
-
- case FLOAT_ARR:
- val = reader.readFloatArray(id);
-
- break;
-
- case DOUBLE_ARR:
- val = reader.readDoubleArray(id);
-
- break;
-
- case CHAR_ARR:
- val = reader.readCharArray(id);
-
- break;
-
- case BOOLEAN_ARR:
- val = reader.readBooleanArray(id);
-
- break;
-
- case DECIMAL_ARR:
- val = reader.readDecimalArray(id);
-
- break;
-
- case STRING_ARR:
- val = reader.readStringArray(id);
-
- break;
-
- case UUID_ARR:
- val = reader.readUuidArray(id);
-
- break;
-
- case DATE_ARR:
- val = reader.readDateArray(id);
-
- break;
-
- case TIMESTAMP_ARR:
- val = reader.readTimestampArray(id);
-
- break;
-
- case OBJECT_ARR:
- val = reader.readObjectArray(id);
-
- break;
-
- case COL:
- val = reader.readCollection(id, null);
-
- break;
-
- case MAP:
- val = reader.readMap(id, null);
-
- break;
-
- case MAP_ENTRY:
- val = reader.readMapEntry(id);
-
- break;
-
- case PORTABLE_OBJ:
- val = reader.readPortableObject(id);
-
- break;
-
- case ENUM:
- val = reader.readEnum(id, field.getType());
-
- break;
-
- case ENUM_ARR:
- val = reader.readEnumArray(id, field.getType().getComponentType());
-
- break;
-
- case PORTABLE:
- case EXTERNALIZABLE:
- case OBJECT:
- val = reader.readObject(id);
-
- break;
-
- case CLASS:
- val = reader.readClass(id);
-
- break;
-
- default:
- assert false : "Invalid mode: " + mode;
- }
-
- try {
- if (val != null || !field.getType().isPrimitive())
- field.set(obj, val);
- }
- catch (IllegalAccessException e) {
- throw new BinaryObjectException("Failed to set value for field: " + field, e);
- }
- }
- }
-
- /** */
- enum Mode {
- /** */
- BYTE(GridPortableMarshaller.BYTE),
-
- /** */
- SHORT(GridPortableMarshaller.SHORT),
-
- /** */
- INT(GridPortableMarshaller.INT),
-
- /** */
- LONG(GridPortableMarshaller.LONG),
-
- /** */
- FLOAT(GridPortableMarshaller.FLOAT),
-
- /** */
- DOUBLE(GridPortableMarshaller.DOUBLE),
-
- /** */
- CHAR(GridPortableMarshaller.CHAR),
-
- /** */
- BOOLEAN(GridPortableMarshaller.BOOLEAN),
-
- /** */
- DECIMAL(GridPortableMarshaller.DECIMAL),
-
- /** */
- STRING(GridPortableMarshaller.STRING),
-
- /** */
- UUID(GridPortableMarshaller.UUID),
-
- /** */
- DATE(GridPortableMarshaller.DATE),
-
- /** */
- TIMESTAMP(GridPortableMarshaller.TIMESTAMP),
-
- /** */
- BYTE_ARR(GridPortableMarshaller.BYTE_ARR),
-
- /** */
- SHORT_ARR(GridPortableMarshaller.SHORT_ARR),
-
- /** */
- INT_ARR(GridPortableMarshaller.INT_ARR),
-
- /** */
- LONG_ARR(GridPortableMarshaller.LONG_ARR),
-
- /** */
- FLOAT_ARR(GridPortableMarshaller.FLOAT_ARR),
-
- /** */
- DOUBLE_ARR(GridPortableMarshaller.DOUBLE_ARR),
-
- /** */
- CHAR_ARR(GridPortableMarshaller.CHAR_ARR),
-
- /** */
- BOOLEAN_ARR(GridPortableMarshaller.BOOLEAN_ARR),
-
- /** */
- DECIMAL_ARR(GridPortableMarshaller.DECIMAL_ARR),
-
- /** */
- STRING_ARR(GridPortableMarshaller.STRING_ARR),
-
- /** */
- UUID_ARR(GridPortableMarshaller.UUID_ARR),
-
- /** */
- DATE_ARR(GridPortableMarshaller.DATE_ARR),
-
- /** */
- TIMESTAMP_ARR(GridPortableMarshaller.TIMESTAMP_ARR),
-
- /** */
- OBJECT_ARR(GridPortableMarshaller.OBJ_ARR),
-
- /** */
- COL(GridPortableMarshaller.COL),
-
- /** */
- MAP(GridPortableMarshaller.MAP),
-
- /** */
- MAP_ENTRY(GridPortableMarshaller.MAP_ENTRY),
-
- /** */
- PORTABLE_OBJ(GridPortableMarshaller.OBJ),
-
- /** */
- ENUM(GridPortableMarshaller.ENUM),
-
- /** */
- ENUM_ARR(GridPortableMarshaller.ENUM_ARR),
-
- /** */
- CLASS(GridPortableMarshaller.CLASS),
-
- /** */
- PORTABLE(GridPortableMarshaller.PORTABLE_OBJ),
-
- /** */
- EXTERNALIZABLE(GridPortableMarshaller.OBJ),
-
- /** */
- OBJECT(GridPortableMarshaller.OBJ),
-
- /** */
- EXCLUSION(GridPortableMarshaller.OBJ);
-
- /** */
- private final int typeId;
-
- /**
- * @param typeId Type ID.
- */
- Mode(int typeId) {
- this.typeId = typeId;
- }
-
- /**
- * @return Type ID.
- */
- int typeId() {
- return typeId;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
index afc23e1..e3caba4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableContext.java
@@ -31,6 +31,7 @@ import java.net.URLClassLoader;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
@@ -573,8 +574,9 @@ public class PortableContext implements Externalizable {
mappers.putIfAbsent(typeId, idMapper);
- metaHnd.addMeta(typeId,
- new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), null, desc.schemas()).wrap(this));
+ Collection<PortableSchema> schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null;
+
+ metaHnd.addMeta(typeId, new BinaryMetadata(typeId, typeName, desc.fieldsMeta(), null, schemas).wrap(this));
return desc;
}
@@ -782,7 +784,7 @@ public class PortableContext implements Externalizable {
);
fieldsMeta = desc.fieldsMeta();
- schemas = desc.schemas();
+ schemas = desc.schema() != null ? Collections.singleton(desc.schema()) : null;
if (IgniteUtils.detectClassLoader(cls).equals(dfltLdr))
userTypes.put(id, desc);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java
deleted file mode 100644
index 869f81d..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableReaderContext.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable;
-
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.ignite.internal.util.typedef.internal.S;
-import org.apache.ignite.binary.BinaryObject;
-import org.apache.ignite.lang.IgniteBiTuple;
-import org.jetbrains.annotations.Nullable;
-
-/**
-* Reader context.
-*/
-class PortableReaderContext {
- /** */
- private Object oHandles;
-
- /** */
- private Map<Integer, BinaryObject> poHandles;
-
- /**
- * @param handle Handle.
- * @param obj Object.
- */
- @SuppressWarnings("unchecked")
- void setObjectHandler(int handle, Object obj) {
- assert obj != null;
-
- if (oHandles == null)
- oHandles = new IgniteBiTuple(handle, obj);
- else if (oHandles instanceof IgniteBiTuple) {
- Map map = new HashMap(3, 1.0f);
-
- IgniteBiTuple t = (IgniteBiTuple)oHandles;
-
- map.put(t.getKey(), t.getValue());
- map.put(handle, obj);
-
- oHandles = map;
- }
- else
- ((Map)oHandles).put(handle, obj);
- }
-
- /**
- * @param handle Handle.
- * @param po Portable object.
- */
- void setPortableHandler(int handle, BinaryObject po) {
- assert po != null;
-
- if (poHandles == null)
- poHandles = new HashMap<>(3, 1.0f);
-
- poHandles.put(handle, po);
- }
-
- /**
- * @param handle Handle.
- * @return Object.
- */
- @Nullable Object getObjectByHandle(int handle) {
- if (oHandles != null) {
- if (oHandles instanceof IgniteBiTuple) {
- IgniteBiTuple t = (IgniteBiTuple)oHandles;
-
- if ((int)t.get1() == handle)
- return t.get2();
- }
- else
- return ((Map)oHandles).get(handle);
- }
-
- return null;
- }
-
- /**
- * @param handle Handle.
- * @return Object.
- */
- @Nullable BinaryObject getPortableByHandle(int handle) {
- return poHandles != null ? poHandles.get(handle) : null;
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(PortableReaderContext.class, this);
- }
-}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
index 86ca5f8..72a96b9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
@@ -17,14 +17,11 @@
package org.apache.ignite.internal.portable;
-import org.apache.ignite.internal.util.typedef.internal.U;
-
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -41,14 +38,26 @@ public class PortableSchema implements Externalizable {
/** Order returned if field is not found. */
public static final int ORDER_NOT_FOUND = -1;
- /** Inline flag. */
- private boolean inline;
+ /** Minimum sensible size. */
+ private static final int MAP_MIN_SIZE = 32;
- /** Map with ID to order. */
- private HashMap<Integer, Integer> idToOrder;
+ /** Empty cell. */
+ private static final int MAP_EMPTY = 0;
+
+ /** Schema ID. */
+ private int schemaId;
/** IDs depending on order. */
- private ArrayList<Integer> ids;
+ private int[] ids;
+
+ /** Interned names of associated fields. */
+ private String[] names;
+
+ /** ID-to-order data. */
+ private int[] idToOrderData;
+
+ /** ID-to-order mask. */
+ private int idToOrderMask;
/** ID 1. */
private int id0;
@@ -62,21 +71,6 @@ public class PortableSchema implements Externalizable {
/** ID 4. */
private int id3;
- /** ID 1. */
- private int id4;
-
- /** ID 2. */
- private int id5;
-
- /** ID 3. */
- private int id6;
-
- /** ID 4. */
- private int id7;
-
- /** Schema ID. */
- private int schemaId;
-
/**
* {@link Externalizable} support.
*/
@@ -91,39 +85,11 @@ public class PortableSchema implements Externalizable {
* @param fieldIds Field IDs.
*/
private PortableSchema(int schemaId, List<Integer> fieldIds) {
- this.schemaId = schemaId;
+ assert fieldIds != null;
- if (fieldIds.size() <= 8) {
- inline = true;
-
- Iterator<Integer> iter = fieldIds.iterator();
-
- id0 = iter.hasNext() ? iter.next() : 0;
- id1 = iter.hasNext() ? iter.next() : 0;
- id2 = iter.hasNext() ? iter.next() : 0;
- id3 = iter.hasNext() ? iter.next() : 0;
- id4 = iter.hasNext() ? iter.next() : 0;
- id5 = iter.hasNext() ? iter.next() : 0;
- id6 = iter.hasNext() ? iter.next() : 0;
- id7 = iter.hasNext() ? iter.next() : 0;
-
- idToOrder = null;
- }
- else {
- inline = false;
-
- id0 = id1 = id2 = id3 = id4 = id5 = id6 = id7 = 0;
-
- ids = new ArrayList<>();
- idToOrder = new HashMap<>();
-
- for (int i = 0; i < fieldIds.size(); i++) {
- int fieldId = fieldIds.get(i);
+ this.schemaId = schemaId;
- ids.add(fieldId);
- idToOrder.put(fieldId, i);
- }
- }
+ initialize(fieldIds);
}
/**
@@ -134,46 +100,51 @@ public class PortableSchema implements Externalizable {
}
/**
- * Get field ID by order in footer.
+ * Try speculatively confirming order for the given field name.
*
- * @param order Order.
+ * @param expOrder Expected order.
+ * @param expName Expected name.
* @return Field ID.
*/
- public int fieldId(int order) {
- if (inline) {
- switch (order) {
- case 0:
- return id0;
+ @SuppressWarnings("StringEquality")
+ public Confirmation confirmOrder(int expOrder, String expName) {
+ assert expName != null;
- case 1:
- return id1;
+ if (expOrder < names.length) {
+ String name = names[expOrder];
- case 2:
- return id2;
+ // Note that we use only reference equality assuming that field names are interned literals.
+ if (name == expName)
+ return Confirmation.CONFIRMED;
- case 3:
- return id3;
-
- case 4:
- return id4;
-
- case 5:
- return id5;
+ if (name == null)
+ return Confirmation.CLARIFY;
+ }
- case 6:
- return id6;
+ return Confirmation.REJECTED;
+ }
- case 7:
- return id7;
+ /**
+ * Add field name.
+ *
+ * @param order Order.
+ * @param name Name.
+ */
+ public void clarifyFieldName(int order, String name) {
+ assert name != null;
+ assert order < names.length;
- default:
- assert false : "Should not reach here.";
+ names[order] = name.intern();
+ }
- return 0;
- }
- }
- else
- return ids.get(order);
+ /**
+ * Get field ID by order in footer.
+ *
+ * @param order Order.
+ * @return Field ID.
+ */
+ public int fieldId(int order) {
+ return order < ids.length ? ids[order] : 0;
}
/**
@@ -183,7 +154,7 @@ public class PortableSchema implements Externalizable {
* @return Offset or {@code 0} if there is no such field.
*/
public int order(int id) {
- if (inline) {
+ if (idToOrderData == null) {
if (id == id0)
return 0;
@@ -196,24 +167,34 @@ public class PortableSchema implements Externalizable {
if (id == id3)
return 3;
- if (id == id4)
- return 4;
+ return ORDER_NOT_FOUND;
+ }
+ else {
+ int idx = (id & idToOrderMask) << 1;
+
+ int curId = idToOrderData[idx];
- if (id == id5)
- return 5;
+ if (id == curId) // Hit!
+ return idToOrderData[idx + 1];
+ else if (curId == MAP_EMPTY) // No such ID!
+ return ORDER_NOT_FOUND;
+ else {
+ // Unlikely collision scenario.
+ for (int i = 2; i < idToOrderData.length; i += 2) {
+ int newIdx = (idx + i) % idToOrderData.length;
- if (id == id6)
- return 6;
+ assert newIdx < idToOrderData.length - 1;
- if (id == id7)
- return 7;
+ curId = idToOrderData[newIdx];
- return ORDER_NOT_FOUND;
- }
- else {
- Integer order = idToOrder.get(id);
+ if (id == curId)
+ return idToOrderData[newIdx + 1];
+ else if (curId == MAP_EMPTY)
+ return ORDER_NOT_FOUND;
+ }
- return order != null ? order : ORDER_NOT_FOUND;
+ return ORDER_NOT_FOUND;
+ }
}
}
@@ -231,59 +212,173 @@ public class PortableSchema implements Externalizable {
@Override public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(schemaId);
- if (inline) {
- out.writeBoolean(true);
-
- out.writeInt(id0);
- out.writeInt(id1);
- out.writeInt(id2);
- out.writeInt(id3);
- out.writeInt(id4);
- out.writeInt(id5);
- out.writeInt(id6);
- out.writeInt(id7);
- }
- else {
- out.writeBoolean(false);
-
- out.writeInt(ids.size());
+ out.writeInt(ids.length);
- for (Integer id : ids)
- out.writeInt(id);
- }
+ for (Integer id : ids)
+ out.writeInt(id);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
schemaId = in.readInt();
- if (in.readBoolean()) {
- inline = true;
-
- id0 = in.readInt();
- id1 = in.readInt();
- id2 = in.readInt();
- id3 = in.readInt();
- id4 = in.readInt();
- id5 = in.readInt();
- id6 = in.readInt();
- id7 = in.readInt();
+ int idsCnt = in.readInt();
+
+ List<Integer> fieldIds = new ArrayList<>(idsCnt);
+
+ for (int i = 0; i < idsCnt; i++)
+ fieldIds.add(in.readInt());
+
+ initialize(fieldIds);
+ }
+
+ /**
+ * Parse values.
+ *
+ * @param vals Values.
+ * @param size Proposed result size.
+ * @return Parse result.
+ */
+ private static ParseResult parse(int[] vals, int size) {
+ int mask = maskForPowerOfTwo(size);
+
+ int totalSize = size * 2;
+
+ int[] data = new int[totalSize];
+ int collisions = 0;
+
+ for (int order = 0; order < vals.length; order++) {
+ int id = vals[order];
+
+ assert id != 0;
+
+ int idIdx = (id & mask) << 1;
+
+ if (data[idIdx] == 0) {
+ // Found empty slot.
+ data[idIdx] = id;
+ data[idIdx + 1] = order;
+ }
+ else {
+ // Collision!
+ collisions++;
+
+ boolean placeFound = false;
+
+ for (int i = 2; i < totalSize; i += 2) {
+ int newIdIdx = (idIdx + i) % totalSize;
+
+ if (data[newIdIdx] == 0) {
+ data[newIdIdx] = id;
+ data[newIdIdx + 1] = order;
+
+ placeFound = true;
+
+ break;
+ }
+ }
+
+ assert placeFound : "Should always have a place for entry!";
+ }
+ }
+
+ return new ParseResult(data, collisions);
+ }
+
+ /**
+ * Get next power of two which greater or equal to the given number.
+ * This implementation is not meant to be very efficient, so it is expected to be used relatively rare.
+ *
+ * @param val Number
+ * @return Nearest pow2.
+ */
+ private static int nextPowerOfTwo(int val) {
+ int res = 1;
+
+ while (res < val)
+ res = res << 1;
+
+ if (res < 0)
+ throw new IllegalArgumentException("Value is too big to find positive pow2: " + val);
+
+ return res;
+ }
+
+ /**
+ * Calculate mask for the given value which is a power of two.
+ *
+ * @param val Value.
+ * @return Mask.
+ */
+ private static int maskForPowerOfTwo(int val) {
+ int mask = 0;
+ int comparand = 1;
+
+ while (comparand < val) {
+ mask |= comparand;
+
+ comparand <<= 1;
+ }
+
+ return mask;
+ }
+
+ /**
+ * Initialization routine.
+ *
+ * @param fieldIds Field IDs.
+ */
+ private void initialize(List<Integer> fieldIds) {
+ ids = new int[fieldIds.size()];
+
+ for (int i = 0; i < fieldIds.size(); i++)
+ ids[i] = fieldIds.get(i);
+
+ names = new String[fieldIds.size()];
+
+ if (fieldIds.size() <= 4) {
+ Iterator<Integer> iter = fieldIds.iterator();
+
+ id0 = iter.hasNext() ? iter.next() : 0;
+ id1 = iter.hasNext() ? iter.next() : 0;
+ id2 = iter.hasNext() ? iter.next() : 0;
+ id3 = iter.hasNext() ? iter.next() : 0;
}
else {
- inline = false;
+ id0 = id1 = id2 = id3 = 0;
- int size = in.readInt();
+ initializeMap(ids);
+ }
+ }
- ids = new ArrayList<>(size);
- idToOrder = U.newHashMap(size);
+ /**
+ * Initialize the map.
+ *
+ * @param vals Values.
+ */
+ private void initializeMap(int[] vals) {
+ int size = Math.max(nextPowerOfTwo(vals.length) << 2, MAP_MIN_SIZE);
- for (int i = 0; i < size; i++) {
- int fieldId = in.readInt();
+ assert size > 0;
- ids.add(fieldId);
- idToOrder.put(fieldId, i);
- }
+ ParseResult finalRes;
+
+ ParseResult res1 = parse(vals, size);
+
+ if (res1.collisions == 0)
+ finalRes = res1;
+ else {
+ ParseResult res2 = parse(vals, size * 2);
+
+ // Failed to decrease aom
+ if (res2.collisions == 0)
+ finalRes = res2;
+ else
+ finalRes = parse(vals, size * 4);
}
+
+ idToOrderData = finalRes.data;
+ idToOrderMask = maskForPowerOfTwo(idToOrderData.length / 2);
}
/**
@@ -332,4 +427,40 @@ public class PortableSchema implements Externalizable {
return new PortableSchema(schemaId, fields);
}
}
+
+ /**
+ * Order confirmation result.
+ */
+ public enum Confirmation {
+ /** Confirmed. */
+ CONFIRMED,
+
+ /** Denied. */
+ REJECTED,
+
+ /** Field name clarification is needed. */
+ CLARIFY
+ }
+
+ /**
+ * Result of map parsing.
+ */
+ private static class ParseResult {
+ /** Data. */
+ private int[] data;
+
+ /** Collisions. */
+ private int collisions;
+
+ /**
+ * Constructor.
+ *
+ * @param data Data.
+ * @param collisions Collisions.
+ */
+ private ParseResult(int[] data, int collisions) {
+ this.data = data;
+ this.collisions = collisions;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java
deleted file mode 100644
index 8f5bfb2..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableThreadLocalMemoryAllocator.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable;
-
-import org.apache.ignite.internal.portable.streams.PortableMemoryAllocator;
-import org.apache.ignite.internal.util.GridUnsafe;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import sun.misc.Unsafe;
-
-import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
-
-/**
- * Thread-local memory allocator.
- */
-public class PortableThreadLocalMemoryAllocator implements PortableMemoryAllocator {
- /** Memory allocator instance. */
- public static final PortableThreadLocalMemoryAllocator THREAD_LOCAL_ALLOC =
- new PortableThreadLocalMemoryAllocator();
-
- /** Holders. */
- private static final ThreadLocal<ByteArrayHolder> holders = new ThreadLocal<>();
-
- /** Unsafe instance. */
- protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** Array offset: byte. */
- protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /**
- * Ensures singleton.
- */
- private PortableThreadLocalMemoryAllocator() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public byte[] allocate(int size) {
- ByteArrayHolder holder = holders.get();
-
- if (holder == null)
- holders.set(holder = new ByteArrayHolder());
-
- if (holder.acquired)
- return new byte[size];
-
- holder.acquired = true;
-
- if (holder.data == null || size > holder.data.length)
- holder.data = new byte[size];
-
- return holder.data;
- }
-
- /** {@inheritDoc} */
- @Override public byte[] reallocate(byte[] data, int size) {
- ByteArrayHolder holder = holders.get();
-
- assert holder != null;
-
- byte[] newData = new byte[size];
-
- if (holder.data == data)
- holder.data = newData;
-
- UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
-
- return newData;
- }
-
- /** {@inheritDoc} */
- @Override public void release(byte[] data, int maxMsgSize) {
- ByteArrayHolder holder = holders.get();
-
- assert holder != null;
-
- if (holder.data != data)
- return;
-
- holder.maxMsgSize = maxMsgSize;
- holder.acquired = false;
-
- holder.shrink();
- }
-
- /** {@inheritDoc} */
- @Override public long allocateDirect(int size) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public long reallocateDirect(long addr, int size) {
- return 0;
- }
-
- /** {@inheritDoc} */
- @Override public void releaseDirect(long addr) {
- // No-op
- }
-
- /**
- * Checks whether a thread-local array is acquired or not.
- * The function is used by Unit tests.
- *
- * @return {@code true} if acquired {@code false} otherwise.
- */
- public boolean isThreadLocalArrayAcquired() {
- ByteArrayHolder holder = holders.get();
-
- return holder != null && holder.acquired;
- }
-
- /**
- * Thread-local byte array holder.
- */
- private static class ByteArrayHolder {
- /** */
- private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000);
-
- /** Data array */
- private byte[] data;
-
- /** Max message size detected between checks. */
- private int maxMsgSize;
-
- /** Last time array size is checked. */
- private long lastCheck = U.currentTimeMillis();
-
- /** Whether the holder is acquired or not. */
- private boolean acquired;
-
- /**
- * Shrinks array size if needed.
- */
- private void shrink() {
- long now = U.currentTimeMillis();
-
- if (now - lastCheck >= CHECK_FREQ) {
- int halfSize = data.length >> 1;
-
- if (maxMsgSize < halfSize)
- data = new byte[halfSize];
-
- lastCheck = now;
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
index 95ef9591..53d414c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableUtils.java
@@ -17,7 +17,9 @@
package org.apache.ignite.internal.portable;
+import org.apache.ignite.binary.Binarylizable;
import org.apache.ignite.internal.portable.builder.PortableLazyValue;
+import org.apache.ignite.internal.portable.streams.PortableOutputStream;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
@@ -26,6 +28,7 @@ import org.apache.ignite.binary.BinaryObject;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
+import java.io.Externalizable;
import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.Collection;
@@ -669,13 +672,14 @@ public class PortableUtils {
* @return Position where length should be written.
*/
public static int writeHeader(BinaryWriterExImpl writer, int typeId, int hashCode, @Nullable String clsName) {
- writer.doWriteByte(GridPortableMarshaller.OBJ);
- writer.doWriteByte(GridPortableMarshaller.PROTO_VER);
+ PortableOutputStream out = writer.out();
- writer.doWriteShort((short) 0);
-
- writer.doWriteInt(typeId);
- writer.doWriteInt(hashCode);
+ out.unsafeEnsure(12);
+ out.unsafeWriteByte(GridPortableMarshaller.OBJ);
+ out.unsafeWriteByte(GridPortableMarshaller.PROTO_VER);
+ out.unsafeWriteShort((short) 0);
+ out.unsafeWriteInt(typeId);
+ out.unsafeWriteInt(hashCode);
int reserved = writer.reserve(12);
@@ -903,4 +907,108 @@ public class PortableUtils {
oldMeta.affinityKeyFieldName(), mergedSchemas) : oldMeta;
}
}
+
+ /**
+ * @param cls Class.
+ * @return Mode.
+ */
+ @SuppressWarnings("IfMayBeConditional")
+ public static BinaryWriteMode mode(Class<?> cls) {
+ assert cls != null;
+
+ /** Primitives. */
+ if (cls == byte.class)
+ return BinaryWriteMode.P_BYTE;
+ else if (cls == boolean.class)
+ return BinaryWriteMode.P_BOOLEAN;
+ else if (cls == short.class)
+ return BinaryWriteMode.P_SHORT;
+ else if (cls == char.class)
+ return BinaryWriteMode.P_CHAR;
+ else if (cls == int.class)
+ return BinaryWriteMode.P_INT;
+ else if (cls == long.class)
+ return BinaryWriteMode.P_LONG;
+ else if (cls == float.class)
+ return BinaryWriteMode.P_FLOAT;
+ else if (cls == double.class)
+ return BinaryWriteMode.P_DOUBLE;
+
+ /** Boxed primitives. */
+ else if (cls == Byte.class)
+ return BinaryWriteMode.BYTE;
+ else if (cls == Boolean.class)
+ return BinaryWriteMode.BOOLEAN;
+ else if (cls == Short.class)
+ return BinaryWriteMode.SHORT;
+ else if (cls == Character.class)
+ return BinaryWriteMode.CHAR;
+ else if (cls == Integer.class)
+ return BinaryWriteMode.INT;
+ else if (cls == Long.class)
+ return BinaryWriteMode.LONG;
+ else if (cls == Float.class)
+ return BinaryWriteMode.FLOAT;
+ else if (cls == Double.class)
+ return BinaryWriteMode.DOUBLE;
+
+ /** The rest types. */
+ else if (cls == BigDecimal.class)
+ return BinaryWriteMode.DECIMAL;
+ else if (cls == String.class)
+ return BinaryWriteMode.STRING;
+ else if (cls == UUID.class)
+ return BinaryWriteMode.UUID;
+ else if (cls == Date.class)
+ return BinaryWriteMode.DATE;
+ else if (cls == Timestamp.class)
+ return BinaryWriteMode.TIMESTAMP;
+ else if (cls == byte[].class)
+ return BinaryWriteMode.BYTE_ARR;
+ else if (cls == short[].class)
+ return BinaryWriteMode.SHORT_ARR;
+ else if (cls == int[].class)
+ return BinaryWriteMode.INT_ARR;
+ else if (cls == long[].class)
+ return BinaryWriteMode.LONG_ARR;
+ else if (cls == float[].class)
+ return BinaryWriteMode.FLOAT_ARR;
+ else if (cls == double[].class)
+ return BinaryWriteMode.DOUBLE_ARR;
+ else if (cls == char[].class)
+ return BinaryWriteMode.CHAR_ARR;
+ else if (cls == boolean[].class)
+ return BinaryWriteMode.BOOLEAN_ARR;
+ else if (cls == BigDecimal[].class)
+ return BinaryWriteMode.DECIMAL_ARR;
+ else if (cls == String[].class)
+ return BinaryWriteMode.STRING_ARR;
+ else if (cls == UUID[].class)
+ return BinaryWriteMode.UUID_ARR;
+ else if (cls == Date[].class)
+ return BinaryWriteMode.DATE_ARR;
+ else if (cls == Timestamp[].class)
+ return BinaryWriteMode.TIMESTAMP_ARR;
+ else if (cls.isArray())
+ return cls.getComponentType().isEnum() ?
+ BinaryWriteMode.ENUM_ARR : BinaryWriteMode.OBJECT_ARR;
+ else if (cls == BinaryObjectImpl.class)
+ return BinaryWriteMode.PORTABLE_OBJ;
+ else if (Binarylizable.class.isAssignableFrom(cls))
+ return BinaryWriteMode.PORTABLE;
+ else if (Externalizable.class.isAssignableFrom(cls))
+ return BinaryWriteMode.EXTERNALIZABLE;
+ else if (Map.Entry.class.isAssignableFrom(cls))
+ return BinaryWriteMode.MAP_ENTRY;
+ else if (Collection.class.isAssignableFrom(cls))
+ return BinaryWriteMode.COL;
+ else if (Map.class.isAssignableFrom(cls))
+ return BinaryWriteMode.MAP;
+ else if (cls.isEnum())
+ return BinaryWriteMode.ENUM;
+ else if (cls == Class.class)
+ return BinaryWriteMode.CLASS;
+ else
+ return BinaryWriteMode.OBJECT;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java
index dfc2330..2ce2416 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/BinaryObjectBuilderImpl.java
@@ -178,7 +178,9 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
/** {@inheritDoc} */
@Override public BinaryObject build() {
- try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx, typeId, false)) {
+ try (BinaryWriterExImpl writer = new BinaryWriterExImpl(ctx)) {
+ writer.typeId(typeId);
+
PortableBuilderSerializer serializationCtx = new PortableBuilderSerializer();
serializationCtx.registerObjectWriting(this, 0);
@@ -206,7 +208,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
Set<Integer> remainsFlds = null;
if (reader != null) {
- PortableSchema schema = reader.schema(start);
+ PortableSchema schema = reader.schema();
Map<Integer, Object> assignedFldsById;
@@ -440,7 +442,7 @@ public class BinaryObjectBuilderImpl implements BinaryObjectBuilder {
int fieldIdLen = PortableUtils.fieldIdLength(flags);
int fieldOffsetLen = PortableUtils.fieldOffsetLength(flags);
- PortableSchema schema = reader.schema(start);
+ PortableSchema schema = reader.schema();
Map<Integer, Object> readCache = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
index b6a6b54..538c26c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/builder/PortableBuilderReader.java
@@ -21,6 +21,8 @@ import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.ignite.internal.portable.BinaryReaderHandles;
import org.apache.ignite.internal.portable.GridPortableMarshaller;
import org.apache.ignite.internal.portable.PortableContext;
import org.apache.ignite.internal.portable.PortablePositionReadable;
@@ -31,6 +33,7 @@ import org.apache.ignite.internal.portable.PortableSchema;
import org.apache.ignite.internal.portable.PortableUtils;
import org.apache.ignite.internal.portable.BinaryWriterExImpl;
import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.portable.streams.PortableHeapInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.ignite.internal.portable.GridPortableMarshaller.NULL;
@@ -41,21 +44,23 @@ import static org.apache.ignite.internal.portable.GridPortableMarshaller.STRING;
*/
public class PortableBuilderReader implements PortablePositionReadable {
/** */
- private final Map<Integer, BinaryObjectBuilderImpl> objMap = new HashMap<>();
+ private final PortableContext ctx;
/** */
- private final PortableContext ctx;
+ private final byte[] arr;
/** */
private final BinaryReaderExImpl reader;
/** */
- private byte[] arr;
+ private final Map<Integer, BinaryObjectBuilderImpl> objMap;
/** */
private int pos;
- /**
+ /*
+ * Constructor.
+ *
* @param objImpl Portable object
*/
PortableBuilderReader(BinaryObjectImpl objImpl) {
@@ -64,7 +69,25 @@ public class PortableBuilderReader implements PortablePositionReadable {
pos = objImpl.start();
// TODO: IGNITE-1272 - Is class loader needed here?
- reader = new BinaryReaderExImpl(ctx, arr, pos, null);
+ reader = new BinaryReaderExImpl(ctx, PortableHeapInputStream.create(arr, pos), null, new BinaryReaderHandles());
+
+ objMap = new HashMap<>();
+ }
+
+ /**
+ * Copying constructor.
+ *
+ * @param other Other reader.
+ * @param start Start position.
+ */
+ PortableBuilderReader(PortableBuilderReader other, int start) {
+ this.ctx = other.ctx;
+ this.arr = other.arr;
+ this.pos = start;
+
+ reader = new BinaryReaderExImpl(ctx, PortableHeapInputStream.create(arr, start), null, other.reader.handles());
+
+ this.objMap = other.objMap;
}
/**
@@ -84,19 +107,10 @@ public class PortableBuilderReader implements PortablePositionReadable {
/**
* Get schema of the object, starting at the given position.
*
- * @param start Start position.
* @return Object's schema.
*/
- public PortableSchema schema(int start) {
- // We can use current reader in case start is equal to initially recorded position.
- BinaryReaderExImpl targetReader;
-
- if (start == pos)
- targetReader = reader;
- else
- targetReader = new BinaryReaderExImpl(ctx, arr, start, null);
-
- return targetReader.getOrCreateSchema();
+ public PortableSchema schema() {
+ return reader.getOrCreateSchema();
}
/**
@@ -367,7 +381,7 @@ public class PortableBuilderReader implements PortablePositionReadable {
BinaryObjectBuilderImpl res = objMap.get(objStart);
if (res == null) {
- res = new BinaryObjectBuilderImpl(this, objStart);
+ res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, objStart), objStart);
objMap.put(objStart, res);
}
@@ -379,7 +393,7 @@ public class PortableBuilderReader implements PortablePositionReadable {
BinaryObjectBuilderImpl res = objMap.get(pos);
if (res == null) {
- res = new BinaryObjectBuilderImpl(this, pos);
+ res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, pos), pos);
objMap.put(pos, res);
}
@@ -492,7 +506,7 @@ public class PortableBuilderReader implements PortablePositionReadable {
BinaryObjectBuilderImpl res = objMap.get(objStart);
if (res == null) {
- res = new BinaryObjectBuilderImpl(this, objStart);
+ res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, objStart), objStart);
objMap.put(objStart, res);
}
@@ -506,7 +520,7 @@ public class PortableBuilderReader implements PortablePositionReadable {
BinaryObjectBuilderImpl res = objMap.get(pos);
if (res == null) {
- res = new BinaryObjectBuilderImpl(this, pos);
+ res = new BinaryObjectBuilderImpl(new PortableBuilderReader(this, pos), pos);
objMap.put(pos, res);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
index c943682..b68e9d1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableAbstractOutputStream.java
@@ -123,14 +123,14 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea
@Override public void writeShort(int pos, short val) {
ensureCapacity(pos + 2);
- writeShortPositioned(pos, val);
+ unsafeWriteShort(pos, val);
}
/** {@inheritDoc} */
@Override public void writeInt(int pos, int val) {
ensureCapacity(pos + 4);
- writeIntPositioned(pos, val);
+ unsafeWriteInt(pos, val);
}
/** {@inheritDoc} */
@@ -247,6 +247,26 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea
return 0;
}
+ /** {@inheritDoc} */
+ @Override public void unsafeEnsure(int cap) {
+ ensureCapacity(pos + cap);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteBoolean(boolean val) {
+ unsafeWriteByte(val ? BYTE_ONE : BYTE_ZERO);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteFloat(float val) {
+ unsafeWriteInt(Float.floatToIntBits(val));
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteDouble(double val) {
+ unsafeWriteLong(Double.doubleToLongBits(val));
+ }
+
/**
* Calculate new capacity.
*
@@ -314,22 +334,6 @@ public abstract class PortableAbstractOutputStream extends PortableAbstractStrea
protected abstract void writeLongFast(long val);
/**
- * Write short value to the given position.
- *
- * @param pos Position.
- * @param val Value.
- */
- protected abstract void writeShortPositioned(int pos, short val);
-
- /**
- * Write int value to the given position.
- *
- * @param pos Position.
- * @param val Value.
- */
- protected abstract void writeIntPositioned(int pos, int val);
-
- /**
* Ensure capacity.
*
* @param cnt Required byte count.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
index e027d70..1b39950 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapInputStream.java
@@ -23,6 +23,23 @@ import java.util.Arrays;
* Portable off-heap input stream.
*/
public final class PortableHeapInputStream extends PortableAbstractInputStream {
+ /**
+ * Create stream with pointer set at the given position.
+ *
+ * @param data Data.
+ * @param pos Position.
+ * @return Stream.
+ */
+ public static PortableHeapInputStream create(byte[] data, int pos) {
+ assert pos < data.length;
+
+ PortableHeapInputStream stream = new PortableHeapInputStream(data);
+
+ stream.pos = pos;
+
+ return stream;
+ }
+
/** Data. */
private byte[] data;
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
index 208ad33..062a359 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableHeapOutputStream.java
@@ -17,73 +17,40 @@
package org.apache.ignite.internal.portable.streams;
-import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.DFLT_ALLOC;
-import static org.apache.ignite.internal.portable.PortableThreadLocalMemoryAllocator.THREAD_LOCAL_ALLOC;
-
/**
* Portable heap output stream.
*/
public final class PortableHeapOutputStream extends PortableAbstractOutputStream {
- /** Default capacity. */
- private static final int DFLT_CAP = 1024;
-
/** Allocator. */
- private final PortableMemoryAllocator alloc;
+ private final PortableMemoryAllocatorChunk chunk;
/** Data. */
private byte[] data;
/**
* Constructor.
- */
- public PortableHeapOutputStream() {
- this(DFLT_CAP, DFLT_ALLOC);
- }
-
- /**
- * Constructor.
*
* @param cap Initial capacity.
*/
public PortableHeapOutputStream(int cap) {
- this(cap, THREAD_LOCAL_ALLOC);
+ this(cap, PortableMemoryAllocator.INSTANCE.chunk());
}
/**
* Constructor.
*
- * @param cap Initial capacity.
- * @param alloc Allocator.
+ * @param cap Capacity.
+ * @param chunk Chunk.
*/
- public PortableHeapOutputStream(int cap, PortableMemoryAllocator alloc) {
- data = alloc.allocate(cap);
-
- this.alloc = alloc;
- }
+ public PortableHeapOutputStream(int cap, PortableMemoryAllocatorChunk chunk) {
+ this.chunk = chunk;
- /**
- * Constructor.
- *
- * @param data Data.
- */
- public PortableHeapOutputStream(byte[] data) {
- this(data, DFLT_ALLOC);
- }
-
- /**
- * Constructor.
- *
- * @param data Data.
- * @param alloc Allocator.
- */
- public PortableHeapOutputStream(byte[] data, PortableMemoryAllocator alloc) {
- this.data = data;
- this.alloc = alloc;
+ data = chunk.allocate(cap);
}
/** {@inheritDoc} */
@Override public void close() {
- alloc.release(data, pos);
+ chunk.release(data, pos);
}
/** {@inheritDoc} */
@@ -91,7 +58,7 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream
if (cnt > data.length) {
int newCap = capacity(data.length, cnt);
- data = alloc.reallocate(data, newCap);
+ data = chunk.reallocate(data, newCap);
}
}
@@ -147,18 +114,63 @@ public final class PortableHeapOutputStream extends PortableAbstractOutputStream
}
/** {@inheritDoc} */
- @Override protected void writeShortPositioned(int pos, short val) {
+ @Override public void unsafeWriteByte(byte val) {
+ UNSAFE.putByte(data, BYTE_ARR_OFF + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(short val) {
if (!LITTLE_ENDIAN)
val = Short.reverseBytes(val);
UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+
+ shift(2);
}
/** {@inheritDoc} */
- @Override protected void writeIntPositioned(int pos, int val) {
+ @Override public void unsafeWriteShort(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(data, BYTE_ARR_OFF + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteChar(char val) {
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ UNSAFE.putChar(data, BYTE_ARR_OFF + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
+
+ shift(4);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteInt(int pos, int val) {
if (!LITTLE_ENDIAN)
val = Integer.reverseBytes(val);
UNSAFE.putInt(data, BYTE_ARR_OFF + pos, val);
}
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteLong(long val) {
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ UNSAFE.putLong(data, BYTE_ARR_OFF + pos, val);
+
+ shift(8);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
index 7ddb457..e16747b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocator.java
@@ -18,59 +18,40 @@
package org.apache.ignite.internal.portable.streams;
/**
- * Portable memory allocator.
+ * Thread-local memory allocator.
*/
-public interface PortableMemoryAllocator {
- /** Default memory allocator. */
- public static final PortableMemoryAllocator DFLT_ALLOC = new PortableSimpleMemoryAllocator();
+public final class PortableMemoryAllocator {
+ /** Memory allocator instance. */
+ public static final PortableMemoryAllocator INSTANCE = new PortableMemoryAllocator();
- /**
- * Allocate memory.
- *
- * @param size Size.
- * @return Data.
- */
- public byte[] allocate(int size);
+ /** Holders. */
+ private static final ThreadLocal<PortableMemoryAllocatorChunk> holders = new ThreadLocal<>();
/**
- * Reallocates memory.
- *
- * @param data Current data chunk.
- * @param size New size required.
- *
- * @return Data.
+ * Ensures singleton.
*/
- public byte[] reallocate(byte[] data, int size);
+ private PortableMemoryAllocator() {
+ // No-op.
+ }
- /**
- * Release memory.
- *
- * @param data Data.
- * @param maxMsgSize Max message size sent during the time the allocator is used.
- */
- public void release(byte[] data, int maxMsgSize);
+ public PortableMemoryAllocatorChunk chunk() {
+ PortableMemoryAllocatorChunk holder = holders.get();
- /**
- * Allocate memory.
- *
- * @param size Size.
- * @return Address.
- */
- public long allocateDirect(int size);
+ if (holder == null)
+ holders.set(holder = new PortableMemoryAllocatorChunk());
- /**
- * Reallocate memory.
- *
- * @param addr Address.
- * @param size Size.
- * @return Address.
- */
- public long reallocateDirect(long addr, int size);
+ return holder;
+ }
/**
- * Release memory.
+ * Checks whether a thread-local array is acquired or not.
+ * The function is used by Unit tests.
*
- * @param addr Address.
+ * @return {@code true} if acquired {@code false} otherwise.
*/
- public void releaseDirect(long addr);
+ public boolean isAcquired() {
+ PortableMemoryAllocatorChunk holder = holders.get();
+
+ return holder != null && holder.isAcquired();
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
new file mode 100644
index 0000000..35d58f7
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableMemoryAllocatorChunk.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.portable.streams;
+
+import org.apache.ignite.internal.util.GridUnsafe;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import sun.misc.Unsafe;
+
+import static org.apache.ignite.IgniteSystemProperties.IGNITE_MARSHAL_BUFFERS_RECHECK;
+
+/**
+ * Memory allocator chunk.
+ */
+public class PortableMemoryAllocatorChunk {
+ /** Unsafe instance. */
+ protected static final Unsafe UNSAFE = GridUnsafe.unsafe();
+
+ /** Array offset: byte. */
+ protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
+
+ /** Buffer size re-check frequency. */
+ private static final Long CHECK_FREQ = Long.getLong(IGNITE_MARSHAL_BUFFERS_RECHECK, 10000);
+
+ /** Data array */
+ private byte[] data;
+
+ /** Max message size detected between checks. */
+ private int maxMsgSize;
+
+ /** Last time array size is checked. */
+ private long lastCheck = U.currentTimeMillis();
+
+ /** Whether the holder is acquired or not. */
+ private boolean acquired;
+
+ /**
+ * Allocate.
+ *
+ * @param size Desired size.
+ * @return Data.
+ */
+ public byte[] allocate(int size) {
+ if (acquired)
+ return new byte[size];
+
+ acquired = true;
+
+ if (data == null || size > data.length)
+ data = new byte[size];
+
+ return data;
+ }
+
+ /**
+ * Reallocate.
+ *
+ * @param data Old data.
+ * @param size Size.
+ * @return New data.
+ */
+ public byte[] reallocate(byte[] data, int size) {
+ byte[] newData = new byte[size];
+
+ if (this.data == data)
+ this.data = newData;
+
+ UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
+
+ return newData;
+ }
+
+ /**
+ * Shrinks array size if needed.
+ */
+ public void release(byte[] data, int maxMsgSize) {
+ if (this.data != data)
+ return;
+
+ if (maxMsgSize > this.maxMsgSize)
+ this.maxMsgSize = maxMsgSize;
+
+ this.acquired = false;
+
+ long now = U.currentTimeMillis();
+
+ if (now - this.lastCheck >= CHECK_FREQ) {
+ int halfSize = data.length >> 1;
+
+ if (this.maxMsgSize < halfSize)
+ this.data = new byte[halfSize];
+
+ this.lastCheck = now;
+ }
+ }
+
+ /**
+ * @return {@code True} if acquired.
+ */
+ public boolean isAcquired() {
+ return acquired;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
index 430a176..cadd244 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOffheapOutputStream.java
@@ -125,24 +125,69 @@ public class PortableOffheapOutputStream extends PortableAbstractOutputStream {
}
/** {@inheritDoc} */
- @Override protected void writeShortPositioned(int pos, short val) {
+ @Override public boolean hasArray() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteByte(byte val) {
+ UNSAFE.putByte(ptr + pos++, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(short val) {
if (!LITTLE_ENDIAN)
val = Short.reverseBytes(val);
UNSAFE.putShort(ptr + pos, val);
+
+ shift(2);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteShort(int pos, short val) {
+ if (!LITTLE_ENDIAN)
+ val = Short.reverseBytes(val);
+
+ UNSAFE.putShort(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteChar(char val) {
+ if (!LITTLE_ENDIAN)
+ val = Character.reverseBytes(val);
+
+ UNSAFE.putChar(ptr + pos, val);
+
+ shift(2);
}
/** {@inheritDoc} */
- @Override protected void writeIntPositioned(int pos, int val) {
+ @Override public void unsafeWriteInt(int val) {
if (!LITTLE_ENDIAN)
val = Integer.reverseBytes(val);
UNSAFE.putInt(ptr + pos, val);
+
+ shift(4);
}
/** {@inheritDoc} */
- @Override public boolean hasArray() {
- return false;
+ @Override public void unsafeWriteInt(int pos, int val) {
+ if (!LITTLE_ENDIAN)
+ val = Integer.reverseBytes(val);
+
+ UNSAFE.putInt(ptr + pos, val);
+ }
+
+ /** {@inheritDoc} */
+ @Override public void unsafeWriteLong(long val) {
+ if (!LITTLE_ENDIAN)
+ val = Long.reverseBytes(val);
+
+ UNSAFE.putLong(ptr + pos, val);
+
+ shift(8);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
index 0e25b12..3a2e9e1 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableOutputStream.java
@@ -170,4 +170,83 @@ public interface PortableOutputStream extends PortableStream, AutoCloseable {
* Close the stream releasing resources.
*/
@Override public void close();
+
+ /**
+ * Ensure capacity for unsafe writes.
+ *
+ * @param cap Capacity.
+ */
+ public void unsafeEnsure(int cap);
+
+ /**
+ * Write byte in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteByte(byte val);
+
+ /**
+ * Write boolean in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteBoolean(boolean val);
+
+ /**
+ * Write short in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteShort(short val);
+
+ /**
+ * Write short in unsafe mode.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void unsafeWriteShort(int pos, short val);
+
+ /**
+ * Write char in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteChar(char val);
+
+ /**
+ * Write int in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteInt(int val);
+
+ /**
+ * Write int in unsafe mode.
+ *
+ * @param pos Position.
+ * @param val Value.
+ */
+ public void unsafeWriteInt(int pos, int val);
+
+ /**
+ * Write long in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteLong(long val);
+
+ /**
+ * Write float in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteFloat(float val);
+
+ /**
+ * Write double in unsafe mode.
+ *
+ * @param val Value.
+ */
+ public void unsafeWriteDouble(double val);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4a1af37e/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
deleted file mode 100644
index 54d7b38..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/streams/PortableSimpleMemoryAllocator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.portable.streams;
-
-import org.apache.ignite.internal.util.GridUnsafe;
-import sun.misc.Unsafe;
-
-/**
- * Naive implementation of portable memory allocator.
- */
-public class PortableSimpleMemoryAllocator implements PortableMemoryAllocator {
- /** Unsafe. */
- private static final Unsafe UNSAFE = GridUnsafe.unsafe();
-
- /** Array offset: byte. */
- protected static final long BYTE_ARR_OFF = UNSAFE.arrayBaseOffset(byte[].class);
-
- /** {@inheritDoc} */
- @Override public byte[] allocate(int size) {
- return new byte[size];
- }
-
- /** {@inheritDoc} */
- @Override public byte[] reallocate(byte[] data, int size) {
- byte[] newData = new byte[size];
-
- UNSAFE.copyMemory(data, BYTE_ARR_OFF, newData, BYTE_ARR_OFF, data.length);
-
- return newData;
- }
-
- /** {@inheritDoc} */
- @Override public void release(byte[] data, int maxMsgSize) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public long allocateDirect(int size) {
- return UNSAFE.allocateMemory(size);
- }
-
- /** {@inheritDoc} */
- @Override public long reallocateDirect(long addr, int size) {
- return UNSAFE.reallocateMemory(addr, size);
- }
-
- /** {@inheritDoc} */
- @Override public void releaseDirect(long addr) {
- UNSAFE.freeMemory(addr);
- }
-}
\ No newline at end of file