You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2015/11/11 14:17:39 UTC

[2/3] ignite git commit: IGNITE-1816: Reworked metadata updates.

IGNITE-1816: Reworked metadata updates.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/b00502a6
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/b00502a6
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/b00502a6

Branch: refs/heads/ignite-1816
Commit: b00502a69b109ab702b3e2c90ec55e4612308935
Parents: 9e79260
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Wed Nov 11 15:33:59 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Wed Nov 11 15:33:59 2015 +0300

----------------------------------------------------------------------
 .../internal/portable/BinaryMetadata.java       |   4 +-
 .../internal/portable/PortableSchema.java       |  10 ++
 .../CacheObjectBinaryProcessorImpl.java         | 169 ++++++++-----------
 .../marshaller/portable/PortableMarshaller.java |   3 +-
 4 files changed, 88 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
index e7eb70f..a464d6e 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/BinaryMetadata.java
@@ -127,8 +127,8 @@ public class BinaryMetadata implements Externalizable {
     /**
      * @return Schemas.
      */
-    @Nullable public Collection<PortableSchema> schemas() {
-        return schemas;
+    public Collection<PortableSchema> schemas() {
+        return schemas != null ? schemas : Collections.<PortableSchema>emptyList();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
index a670b78..f6838c0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/portable/PortableSchema.java
@@ -154,6 +154,16 @@ public class PortableSchema implements Serializable {
         }
     }
 
+    /** {@inheritDoc} */
+    @Override public int hashCode() {
+        return schemaId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean equals(Object o) {
+        return o != null && o instanceof PortableSchema && schemaId == ((PortableSchema)o).schemaId;
+    }
+
     /**
      * Schema builder.
      */

http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
index f481963..01ec71c 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/portable/CacheObjectBinaryProcessorImpl.java
@@ -37,6 +37,7 @@ import org.apache.ignite.internal.portable.BinaryTypeImpl;
 import org.apache.ignite.internal.portable.GridPortableMarshaller;
 import org.apache.ignite.internal.portable.PortableContext;
 import org.apache.ignite.internal.portable.BinaryMetadataHandler;
+import org.apache.ignite.internal.portable.PortableSchema;
 import org.apache.ignite.internal.portable.PortableUtils;
 import org.apache.ignite.internal.portable.builder.BinaryObjectBuilderImpl;
 import org.apache.ignite.internal.portable.streams.PortableInputStream;
@@ -89,6 +90,7 @@ import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -168,17 +170,14 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
                     if (metaDataCache == null) {
                         BinaryMetadata oldMeta = metaBuf.get(typeId);
+                        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
 
-                        if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta0, null)) {
+                        if (oldMeta != mergedMeta) {
                             synchronized (this) {
-                                Map<String, Integer> fields = new HashMap<>();
+                                mergedMeta = mergeMetadata(oldMeta, newMeta0);
 
-                                if (checkMeta(typeId, oldMeta, newMeta0, fields)) {
-                                    newMeta0 = new BinaryMetadata(typeId, newMeta0.typeName(), fields,
-                                        newMeta0.affinityKeyFieldName(), newMeta0.schemas());
-
-                                    metaBuf.put(typeId, newMeta0);
-                                }
+                                if (oldMeta != mergedMeta)
+                                    metaBuf.put(typeId, mergedMeta);
                                 else
                                     return;
                             }
@@ -188,8 +187,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
                             else
                                 metaBuf.remove(typeId);
                         }
-                        else
-                            return;
                     }
 
                     CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(portableCtx));
@@ -297,24 +294,22 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     private void addClientCacheMetaData(PortableMetaDataKey key, final BinaryMetadata newMeta) {
         int key0 = key.typeId();
 
-        clientMetaDataCache.compute(key0,
-            new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
-                @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
-                    BinaryMetadata res;
-
-                    BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
+        clientMetaDataCache.compute(key0, new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
+            @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
+                BinaryMetadata res;
 
-                    try {
-                        res = checkMeta(key, oldMeta0, newMeta, null) ? newMeta : oldMeta0;
-                    }
-                    catch (BinaryObjectException e) {
-                        res = oldMeta0;
-                    }
+                BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
 
-                    return res != null ? res.wrap(portableCtx) : null;
+                try {
+                    res = mergeMetadata(oldMeta0, newMeta);
+                }
+                catch (BinaryObjectException e) {
+                    res = oldMeta0;
                 }
+
+                return res != null ? res.wrap(portableCtx) : null;
             }
-        );
+        });
     }
 
     /** {@inheritDoc} */
@@ -464,13 +459,12 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         try {
             BinaryMetadata oldMeta = metaDataCache.localPeek(key);
+            BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
 
-            if (oldMeta == null || checkMeta(typeId, oldMeta, newMeta0, null)) {
-                BinaryObjectException err = metaDataCache.invoke(key, new MetaDataProcessor(typeId, newMeta0));
+            BinaryObjectException err = metaDataCache.invoke(key, new MetadataProcessor(mergedMeta));
 
-                if (err != null)
-                    throw err;
-            }
+            if (err != null)
+                throw err;
         }
         catch (CacheException e) {
             throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
@@ -727,125 +721,112 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
     }
 
     /**
-     * @param typeId Type ID.
+     * Merge old and new metas.
+     *
      * @param oldMeta Old meta.
      * @param newMeta New meta.
-     * @param fields Fields map.
-     * @return Whether meta is changed.
-     * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+     * @return New meta if old meta was null, old meta if no changes detected, merged meta otherwise.
+     * @throws BinaryObjectException If merge failed due to metadata conflict.
      */
-    private static boolean checkMeta(int typeId, @Nullable BinaryMetadata oldMeta,
-        BinaryMetadata newMeta, @Nullable Map<String, Integer> fields) throws BinaryObjectException {
+    private static BinaryMetadata mergeMetadata(@Nullable BinaryMetadata oldMeta, BinaryMetadata newMeta) {
         assert newMeta != null;
 
-        Map<String, Integer> oldFields = oldMeta != null ? oldMeta.fieldsMap() : null;
-        Map<String, Integer> newFields = newMeta.fieldsMap();
-
-        boolean changed = false;
+        if (oldMeta == null)
+            return newMeta;
+        else {
+            assert oldMeta.typeId() == newMeta.typeId();
 
-        if (oldMeta != null) {
-            if (!oldMeta.typeName().equals(newMeta.typeName())) {
+            // Check type name.
+            if (!F.eq(oldMeta.typeName(), newMeta.typeName())) {
                 throw new BinaryObjectException(
-                    "Two portable types have duplicate type ID [" +
-                        "typeId=" + typeId +
-                        ", typeName1=" + oldMeta.typeName() +
-                        ", typeName2=" + newMeta.typeName() +
-                        ']'
+                    "Two portable types have duplicate type ID [" + "typeId=" + oldMeta.typeId() +
+                        ", typeName1=" + oldMeta.typeName() + ", typeName2=" + newMeta.typeName() + ']'
                 );
             }
 
+            // Check affinity field names.
             if (!F.eq(oldMeta.affinityKeyFieldName(), newMeta.affinityKeyFieldName())) {
                 throw new BinaryObjectException(
-                    "Portable type has different affinity key fields on different clients [" +
-                        "typeName=" + newMeta.typeName() +
+                    "Binary type has different affinity key fields [" + "typeName=" + newMeta.typeName() +
                         ", affKeyFieldName1=" + oldMeta.affinityKeyFieldName() +
-                        ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() +
-                        ']'
+                        ", affKeyFieldName2=" + newMeta.affinityKeyFieldName() + ']'
                 );
             }
 
-            if (fields != null)
-                fields.putAll(oldFields);
-        }
-        else
-            changed = true;
+            // Check and merge fields.
+            boolean changed = false;
+
+            Map<String, Integer> mergedFields = new HashMap<>(oldMeta.fieldsMap());
+            Map<String, Integer> newFields = newMeta.fieldsMap();
 
-        for (Map.Entry<String, Integer> e : newFields.entrySet()) {
-            Integer oldTypeId = oldFields != null ? oldFields.get(e.getKey()) : null;
+            for (Map.Entry<String, Integer> newField : newFields.entrySet()) {
+                Integer oldFieldType = mergedFields.put(newField.getKey(), newField.getValue());
 
-            if (oldTypeId != null) {
-                if (!oldTypeId.equals(e.getValue())) {
+                if (oldFieldType == null)
+                    changed = true;
+                else if (!F.eq(oldFieldType, newField.getValue())) {
                     throw new BinaryObjectException(
-                        "Portable field has different types on different clients [" +
-                            "typeName=" + newMeta.typeName() +
-                            ", fieldName=" + e.getKey() +
-                            ", fieldTypeName1=" + PortableUtils.fieldTypeName(oldTypeId) +
-                            ", fieldTypeName2=" + PortableUtils.fieldTypeName(e.getValue()) +
-                            ']'
+                        "Binary type has different field types [" + "typeName=" + oldMeta.typeName() +
+                            ", fieldName=" + newField.getKey() +
+                            ", fieldTypeName1=" + PortableUtils.fieldTypeName(oldFieldType) +
+                            ", fieldTypeName2=" + PortableUtils.fieldTypeName(newField.getValue()) + ']'
                     );
                 }
             }
-            else {
-                if (fields != null)
-                    fields.put(e.getKey(), e.getValue());
 
-                changed = true;
+            // Check and merge schemas.
+            Collection<PortableSchema> mergedSchemas = new HashSet<>(oldMeta.schemas());
+
+            for (PortableSchema newSchema : newMeta.schemas()) {
+                if (mergedSchemas.add(newSchema))
+                    changed = true;
             }
-        }
 
-        return changed;
+            // Return either old meta if no changes detected, or new merged meta.
+            return changed ? new BinaryMetadata(oldMeta.typeId(), oldMeta.typeName(), mergedFields,
+                oldMeta.affinityKeyFieldName(), mergedSchemas) : oldMeta;
+        }
     }
 
     /**
+     * Processor responsible for metadata update.
      */
-    private static class MetaDataProcessor implements
-        EntryProcessor<PortableMetaDataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
+    private static class MetadataProcessor
+        implements EntryProcessor<PortableMetaDataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
         /** */
         private static final long serialVersionUID = 0L;
 
         /** */
-        private int typeId;
-
-        /** */
         private BinaryMetadata newMeta;
 
         /**
          * For {@link Externalizable}.
          */
-        public MetaDataProcessor() {
+        public MetadataProcessor() {
             // No-op.
         }
 
         /**
-         * @param typeId Type ID.
          * @param newMeta New metadata.
          */
-        private MetaDataProcessor(int typeId, BinaryMetadata newMeta) {
+        private MetadataProcessor(BinaryMetadata newMeta) {
             assert newMeta != null;
 
-            this.typeId = typeId;
             this.newMeta = newMeta;
         }
 
         /** {@inheritDoc} */
-        @Override public BinaryObjectException process(
-            MutableEntry<PortableMetaDataKey, BinaryMetadata> entry,
+        @Override public BinaryObjectException process(MutableEntry<PortableMetaDataKey, BinaryMetadata> entry,
             Object... args) {
             try {
                 BinaryMetadata oldMeta = entry.getValue();
 
-                Map<String, Integer> fields = new HashMap<>();
+                BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta);
 
-                if (checkMeta(typeId, oldMeta, newMeta, fields)) {
-                    BinaryMetadata res = new BinaryMetadata(typeId, newMeta.typeName(), fields,
-                        newMeta.affinityKeyFieldName(), newMeta.schemas());
+                if (mergedMeta != oldMeta)
+                    entry.setValue(mergedMeta);
 
-                    entry.setValue(res);
-
-                    return null;
-                }
-                else
-                    return null;
+                return null;
             }
             catch (BinaryObjectException e) {
                 return e;
@@ -854,19 +835,17 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
 
         /** {@inheritDoc} */
         @Override public void writeExternal(ObjectOutput out) throws IOException {
-            out.writeInt(typeId);
             out.writeObject(newMeta);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-            typeId = in.readInt();
             newMeta = (BinaryMetadata)in.readObject();
         }
 
         /** {@inheritDoc} */
         @Override public String toString() {
-            return S.toString(MetaDataProcessor.class, this);
+            return S.toString(MetadataProcessor.class, this);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/ignite/blob/b00502a6/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
index 1704c8a..4e8217d 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/portable/PortableMarshaller.java
@@ -78,7 +78,8 @@ public class PortableMarshaller extends AbstractMarshaller {
     public static final boolean DFLT_KEEP_DESERIALIZED = true;
 
     /** Default value of "compact footer" flag. */
-    public static final boolean DFLT_COMPACT_FOOTER = true;
+    // TODO: Set to true.
+    public static final boolean DFLT_COMPACT_FOOTER = false;
 
     // TODO ignite-1282 Move to IgniteConfiguration.
     /** Class names. */