You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/11 14:17:39 UTC
[2/3] ignite git commit: IGNITE-1816: Reworked metadata updates.
IGNITE-1816: Reworked metadata updates.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b00502a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b00502a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b00502a6
Branch: refs/heads/ignite-1816
Commit: b00502a69b109ab702b3e2c90ec55e4612308935
Parents: 9e79260
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Nov 11 15:33:59 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Nov 11 15:33:59 2015 +0300
----------------------------------------------------------------------
.../internal/portable/BinaryMetadata.java | 4 +-
.../internal/portable/PortableSchema.java | 10 ++
.../CacheObjectBinaryProcessorImpl.java | 169 ++++++++-----------
.../marshaller/portable/PortableMarshaller.java | 3 +-
4 files changed, 88 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
index e7eb70f..a464d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
@@ -127,8 +127,8 @@ public class BinaryMetadata implements Externalizable {
/**
* @return Schemas.
*/
- @Nullable public Collection<PortableSchema> schemas() {
- return schemas;
+ public Collection<PortableSchema> schemas() {
+ return schemas != null ? schemas : Collections.<PortableSchema>emptyList();
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
index a670b78..f6838c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
@@ -154,6 +154,16 @@ public class PortableSchema implements Serializable {
}
}
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return schemaId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ return o != null && o instanceof PortableSchema && schemaId == ((PortableSchema)o).schemaId;
+ }
+
/**
* Schema builder.
*/
http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
index f481963..01ec71c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.portable.BinaryTypeImpl;
import org.apache.ignite.internal.portable.GridPortableMarshaller;
import org.apache.ignite.internal.portable.PortableContext;
import org.apache.ignite.internal.portable.BinaryMetadataHandler;
+import org.apache.ignite.internal.portable.PortableSchema;
import org.apache.ignite.internal.portable.PortableUtils;
import org.apache.ignite.internal.portable.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.portable.streams.PortableInputStream;
@@ -89,6 +90,7 @@ import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -168,17 +170,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (metaDataCache == null) {
BinaryMetadata oldMeta = metaBuf.get(typeId);
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
- if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta0, null)) {
+ if (oldMeta != mergedMeta) {
synchronized (this) {
- Map<String, Integer> fields = new HashMap<>();
+ mergedMeta = mergeMetadata(oldMeta, newMeta0);
- if (checkMeta(typeId, oldMeta, newMeta0, fields)) {
- newMeta0 = new BinaryMetadata(typeId, newMeta0.typeName(), fields,
- newMeta0.affinityKeyFieldName(), newMeta0.schemas());
-
- metaBuf.put(typeId, newMeta0);
- }
+ if (oldMeta != mergedMeta)
+ metaBuf.put(typeId, mergedMeta);
else
return;
}
@@ -188,8 +187,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
else
metaBuf.remove(typeId);
}
- else
- return;
}
CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(portableCtx));
@@ -297,24 +294,22 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
private void addClientCacheMetaData(PortableMetaDataKey key, final BinaryMetadata newMeta) {
int key0 = key.typeId();
- clientMetaDataCache.compute(key0,
- new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
- @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
- BinaryMetadata res;
-
- BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
+ clientMetaDataCache.compute(key0, new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
+ @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
+ BinaryMetadata res;
- try {
- res = checkMeta(key, oldMeta0, newMeta, null) ? newMeta : oldMeta0;
- }
- catch (BinaryObjectException e) {
- res = oldMeta0;
- }
+ BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
- return res != null ? res.wrap(portableCtx) : null;
+ try {
+ res = mergeMetadata(oldMeta0, newMeta);
+ }
+ catch (BinaryObjectException e) {
+ res = oldMeta0;
}
+
+ return res != null ? res.wrap(portableCtx) : null;
}
- );
+ });
}
/** {@inheritDoc} */
@@ -464,13 +459,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
try {
BinaryMetadata oldMeta = metaDataCache.localPeek(key);
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
- if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta0, null)) {
- BinaryObjectException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta0));
+ BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta));
- if (err != null)
- throw err;
- }
+ if (err != null)
+ throw err;
}
catch (CacheException e) {
throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
@@ -727,125 +721,112 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
/**
- * @param typeId Type ID.
+ * Merge old and new metas.
+ *
* @param oldMeta Old meta.
* @param newMeta New meta.
- * @param fields Fields map.
- * @return Whether meta is changed.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @return New meta if old meta was null, old meta if no changes detected, merged meta otherwise.
+ * @throws BinaryObjectException If merge failed due to metadata conflict.
*/
- private static boolean checkMeta(int typeId, @Nullable BinaryMetadata oldMeta,
- BinaryMetadata newMeta, @Nullable Map<String, Integer> fields) throws BinaryObjectException {
+ private static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta) {
assert newMeta != null;
- Map<String, Integer> oldFields = oldMeta != null ? oldMeta.fieldsMap() : null;
- Map<String, Integer> newFields = newMeta.fieldsMap();
-
- boolean changed = false;
+ if (oldMeta == null)
+ return newMeta;
+ else {
+ assert oldMeta.typeId() == newMeta.typeId();
- if (oldMeta != null) {
- if (!oldMeta.typeName().equals(newMeta.typeName())) {
+ // Check type name.
+ if (!F.eq(oldMeta.typeName(), newMeta.typeName())) {
throw new BinaryObjectException(
- "Two portable types have duplicate type ID [" +
- "typeId=" + typeId +
- ", typeName1=" + oldMeta.typeName() +
- ", typeName2=" + newMeta.typeName() +
- ']'
+ "Two portable types have duplicate type ID [" + "typeId=" + oldMeta.typeId() +
+ ", typeName1=" + oldMeta.typeName() + ", typeName2=" + newMeta.typeName() + ']'
);
}
+ // Check affinity field names.
if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) {
throw new BinaryObjectException(
- "Portable type has different affinity key fields on different clients [" +
- "typeName=" + newMeta.typeName() +
+ "Binary type has different affinity key fields [" + "typeName=" + newMeta.typeName() +
", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() +
- ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() +
- ']'
+ ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() + ']'
);
}
- if (fields != null)
- fields.putAll(oldFields);
- }
- else
- changed = true;
+ // Check and merge fields.
+ boolean changed = false;
+
+ Map<String, Integer> mergedFields = new HashMap<>(oldMeta.fieldsMap());
+ Map<String, Integer> newFields = newMeta.fieldsMap();
- for (Map.Entry<String, Integer> e : newFields.entrySet()) {
- Integer oldTypeId = oldFields != null ? oldFields.get(e.getKey()) : null;
+ for (Map.Entry<String, Integer> newField : newFields.entrySet()) {
+ Integer oldFieldType = mergedFields.put(newField.getKey(), newField.getValue());
- if (oldTypeId != null) {
- if (!oldTypeId.equals(e.getValue())) {
+ if (oldFieldType == null)
+ changed = true;
+ else if (!F.eq(oldFieldType, newField.getValue())) {
throw new BinaryObjectException(
- "Portable field has different types on different clients [" +
- "typeName=" + newMeta.typeName() +
- ", fieldName=" + e.getKey() +
- ", fieldTypeName1=" + PortableUtils.fieldTypeName(oldTypeId) +
- ", fieldTypeName2=" + PortableUtils.fieldTypeName(e.getValue()) +
- ']'
+ "Binary type has different field types [" + "typeName=" + oldMeta.typeName() +
+ ", fieldName=" + newField.getKey() +
+ ", fieldTypeName1=" + PortableUtils.fieldTypeName(oldFieldType) +
+ ", fieldTypeName2=" + PortableUtils.fieldTypeName(newField.getValue()) + ']'
);
}
}
- else {
- if (fields != null)
- fields.put(e.getKey(), e.getValue());
- changed = true;
+ // Check and merge schemas.
+ Collection<PortableSchema> mergedSchemas = new HashSet<>(oldMeta.schemas());
+
+ for (PortableSchema newSchema : newMeta.schemas()) {
+ if (mergedSchemas.add(newSchema))
+ changed = true;
}
- }
- return changed;
+ // Return either old meta if no changes detected, or new merged meta.
+ return changed ? new BinaryMetadata(oldMeta.typeId(), oldMeta.typeName(), mergedFields,
+ oldMeta.affinityKeyFieldName(), mergedSchemas) : oldMeta;
+ }
}
/**
+ * Processor responsible for metadata update.
*/
- private static class MetaDataProcessor implements
- EntryProcessor<PortableMetaDataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
+ private static class MetadataProcessor
+ implements EntryProcessor<PortableMetaDataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
/** */
private static final long serialVersionUID = 0L;
/** */
- private int typeId;
-
- /** */
private BinaryMetadata newMeta;
/**
* For {@link Externalizable}.
*/
- public MetaDataProcessor() {
+ public MetadataProcessor() {
// No-op.
}
/**
- * @param typeId Type ID.
* @param newMeta New metadata.
*/
- private MetaDataProcessor(int typeId, BinaryMetadata newMeta) {
+ private MetadataProcessor(BinaryMetadata newMeta) {
assert newMeta != null;
- this.typeId = typeId;
this.newMeta = newMeta;
}
/** {@inheritDoc} */
- @Override public BinaryObjectException process(
- MutableEntry<PortableMetaDataKey, BinaryMetadata> entry,
+ @Override public BinaryObjectException process(MutableEntry<PortableMetaDataKey, BinaryMetadata> entry,
Object... args) {
try {
BinaryMetadata oldMeta = entry.getValue();
- Map<String, Integer> fields = new HashMap<>();
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta);
- if (checkMeta(typeId, oldMeta, newMeta, fields)) {
- BinaryMetadata res = new BinaryMetadata(typeId, newMeta.typeName(), fields,
- newMeta.affinityKeyFieldName(), newMeta.schemas());
+ if (mergedMeta != oldMeta)
+ entry.setValue(mergedMeta);
- entry.setValue(res);
-
- return null;
- }
- else
- return null;
+ return null;
}
catch (BinaryObjectException e) {
return e;
@@ -854,19 +835,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/** {@inheritDoc} */
@Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(typeId);
out.writeObject(newMeta);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- typeId = in.readInt();
newMeta = (BinaryMetadata)in.readObject();
}
/** {@inheritDoc} */
@Override public String toString() {
- return S.toString(MetaDataProcessor.class, this);
+ return S.toString(MetadataProcessor.class, this);
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
index 1704c8a..4e8217d 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
@@ -78,7 +78,8 @@ public class PortableMarshaller extends AbstractMarshaller {
public static final boolean DFLT_KEEP_DESERIALIZED = true;
/** Default value of "compact footer" flag. */
- public static final boolean DFLT_COMPACT_FOOTER = true;
+ // TODO: Set to true.
+ public static final boolean DFLT_COMPACT_FOOTER = false;
// TODO ignite-1282 Move to IgniteConfiguration.
/** Class names. */