You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/27 14:47:21 UTC

[ignite] branch master updated: IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324.

This is an automated email from the ASF dual-hosted git repository.

agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git


The following commit(s) were added to refs/heads/master by this push:
     new 30a3d23  IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324.
30a3d23 is described below

commit 30a3d2361ad93e61b1036d2b3ff916ece13db904
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Mar 27 17:29:04 2019 +0300

    IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324.
    
    Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
 .../cache/binary/BinaryMetadataTransport.java      | 97 +++++++++++++++++-----
 .../binary/CacheObjectBinaryProcessorImpl.java     | 94 ++++++++++-----------
 .../cache/BinaryTypeRegistrationTest.java          |  2 +-
 3 files changed, 121 insertions(+), 72 deletions(-)

diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index d2fe972..0d2f6f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Queue;
 import java.util.Set;
 import java.util.UUID;
@@ -37,7 +38,6 @@ import org.apache.ignite.internal.GridKernalContext;
 import org.apache.ignite.internal.GridTopic;
 import org.apache.ignite.internal.IgniteInternalFuture;
 import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryUtils;
 import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
 import org.apache.ignite.internal.managers.communication.GridIoManager;
 import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -53,6 +53,7 @@ import org.jetbrains.annotations.Nullable;
 
 import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
 import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
 import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
 
 /**
@@ -163,27 +164,66 @@ final class BinaryMetadataTransport {
     /**
      * Sends request to cluster proposing update for given metadata.
      *
-     * @param metadata Metadata proposed for update.
+     * @param newMeta Metadata proposed for update.
      * @return Future to wait for update result on.
      */
-    GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) {
-        MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(metadata.typeId());
+    GridFutureAdapter<MetadataUpdateResult>  requestMetadataUpdate(BinaryMetadata newMeta) {
+        int typeId = newMeta.typeId();
 
-        if (log.isDebugEnabled())
-            log.debug("Requesting metadata update for " + metadata.typeId() + "; caller thread is blocked on future "
-                + resFut);
+        MetadataUpdateResultFuture resFut;
 
-        MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(metadata.typeId(), resFut);
+        do {
+            BinaryMetadataHolder metaHolder = metaLocCache.get(typeId);
+
+            BinaryMetadata oldMeta = Optional.ofNullable(metaHolder)
+                .map(BinaryMetadataHolder::metadata)
+                .orElse(null);
+
+            BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, null);
+
+            if (mergedMeta == oldMeta) {
+                if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+                    return null;
+
+                return awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+            }
+
+            resFut = new MetadataUpdateResultFuture(typeId);
+        }
+        while (!putAndWaitPendingUpdate(typeId, resFut));
+
+        BinaryMetadataHolder metadataHolder = metaLocCache.get(typeId);
+
+        BinaryMetadata oldMeta = Optional.ofNullable(metadataHolder)
+            .map(BinaryMetadataHolder::metadata)
+            .orElse(null);
+
+        Set<Integer> changedSchemas  = new LinkedHashSet<>();
+
+        //Ensure after putting pending future, metadata still has difference.
+        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas);
+
+        if (mergedMeta == oldMeta) {
+            resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
 
-        if(oldFut != null)
             return null;
+        }
+
+        if (log.isDebugEnabled()) {
+            log.debug("Requesting metadata update [typeId=" + typeId +
+                ", typeName=" + mergedMeta.typeName() +
+                ", changedSchemas=" + changedSchemas +
+                ", holder=" + metadataHolder +
+                ", fut=" + resFut +
+                ']');
+        }
 
         try {
             synchronized (this) {
                 unlabeledFutures.add(resFut);
 
                 if (!stopping)
-                    discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId()));
+                    discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(mergedMeta, ctx.localNodeId()));
                 else
                     resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
             }
@@ -199,6 +239,31 @@ final class BinaryMetadataTransport {
     }
 
     /**
+     * Put new update future and it are waiting pending future if it exists.
+     *
+     * @param typeId Type id.
+     * @param metaUpdateFut New metadata update future.
+     * @return {@code true} If given future put successfully.
+     */
+    private boolean putAndWaitPendingUpdate(int typeId, MetadataUpdateResultFuture metaUpdateFut) {
+        MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(typeId, metaUpdateFut);
+
+        if (oldFut != null) {
+            try {
+                oldFut.get();
+            }
+            catch (IgniteCheckedException ignore) {
+                //Stacktrace will be logged in thread which created this future.
+                log.warning("Pending update metadata process was failed. Trying to update to new metadata.");
+            }
+
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
      * Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster.
      *
      * @param typeId ID of binary type.
@@ -223,14 +288,6 @@ final class BinaryMetadataTransport {
     }
 
     /**
-     * @param typeId Type id.
-     * @return Pending meta update future.
-     */
-    GridFutureAdapter<MetadataUpdateResult> getPendingMetaUpdate(int typeId) {
-        return pendingTypeIdMap.get(typeId);
-    }
-
-    /**
      * Await specific schema update.
      * @param typeId Type id.
      * @param schemaId Schema id.
@@ -331,7 +388,7 @@ final class BinaryMetadataTransport {
                 try {
                     Set<Integer> changedSchemas = new LinkedHashSet<>();
 
-                    BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+                    BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas);
 
                     if (log.isDebugEnabled())
                         log.debug("Versions are stamped on coordinator" +
@@ -410,7 +467,7 @@ final class BinaryMetadataTransport {
                     Set<Integer> changedSchemas = new LinkedHashSet<>();
 
                     try {
-                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+                        BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas);
 
                         BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
 
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 3dc4b45..4cce520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.processors.cache.binary;
 
+import javax.cache.CacheException;
 import java.io.File;
 import java.io.Serializable;
 import java.math.BigDecimal;
@@ -26,14 +27,11 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import javax.cache.CacheException;
 import org.apache.ignite.IgniteBinary;
 import org.apache.ignite.IgniteCheckedException;
 import org.apache.ignite.IgniteClientDisconnectedException;
@@ -121,6 +119,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION
 import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE;
 import static org.apache.ignite.IgniteSystemProperties.getBoolean;
 import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
+import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
 
 /**
  * Binary processor implementation.
@@ -223,9 +222,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
 
                         BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;
 
-                        BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(
-                            oldMeta, ((BinaryTypeImpl)newMeta).metadata()
-                        );
+                        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());
 
                         if (oldMeta != mergedMeta)
                             metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
@@ -536,52 +533,23 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
 
         BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
 
-        try {
-            GridFutureAdapter<MetadataUpdateResult> fut;
-            Set<Integer> changedSchemas;
-            BinaryMetadataHolder metaHolder;
-
-            do {
-                metaHolder = metadataLocCache.get(typeId);
-
-                BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
-
-                changedSchemas = new LinkedHashSet<>();
-
-                BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas);
-
-                if (mergedMeta != oldMeta) {
-                    if (failIfUnregistered)
-                        throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
+        if (failIfUnregistered) {
+            failIfUnregistered(typeId, newMeta0);
 
-                    fut = transport.requestMetadataUpdate(mergedMeta);
-                }
-                else {
-                    if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
-                        return;
+            return;
+        }
 
-                    // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary.
-                    fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+        try {
+            GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(newMeta0);
 
-                    if (failIfUnregistered && !fut.isDone())
-                        throw new UnregisteredBinaryTypeException(typeId, fut);
+            if (fut == null) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Metadata update was skipped [typeId=" + typeId
+                        + ", typeName=" + newMeta.typeName() + ']');
                 }
 
-                if (fut == null) {
-                    GridFutureAdapter<MetadataUpdateResult> pending = transport.getPendingMetaUpdate(typeId);
-
-                    if (pending != null) {
-                        try {
-                            pending.get();
-                        }
-                        catch (IgniteCheckedException ignore) {
-                            //Stacktrace will be logged in thread which created this future.
-                            log.warning("Pending update metadata process was failed. Trying to update to new metadata.");
-                        }
-                    }
-                }
+                return;
             }
-            while (fut == null);
 
             long t0 = System.nanoTime();
 
@@ -592,9 +560,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
 
                 log.debug("Completed metadata update [typeId=" + typeId +
                     ", typeName=" + newMeta.typeName() +
-                    ", changedSchemas=" + changedSchemas +
                     ", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" +
-                    ", holder=" + metaHolder +
                     ", fut=" + fut +
                     ", tx=" + CU.txString(tx) +
                     ']');
@@ -618,6 +584,32 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
         }
     }
 
+    /**
+     * Throw specific exception if given binary metadata is unregistered.
+     *
+     * @param typeId Type id.
+     * @param newMeta0 Expected binary metadata.
+     */
+    private void failIfUnregistered(int typeId, BinaryMetadata newMeta0) {
+        BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
+
+        BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
+
+        BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
+
+        if (mergedMeta != oldMeta)
+            throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
+
+        if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+            return;
+
+        // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary.
+        GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+
+        if (!fut.isDone())
+            throw new UnregisteredBinaryTypeException(typeId, fut);
+    }
+
     /** {@inheritDoc} */
     @Override public void addMetaLocally(int typeId, BinaryType newMeta) throws BinaryObjectException {
         assert newMeta != null;
@@ -630,7 +622,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
         BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
 
         try {
-            BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+            BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
 
             if (!ctx.clientNode())
                 metadataFileStore.mergeAndWriteMetadata(mergedMeta);
@@ -1302,7 +1294,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
                 continue;
 
             try {
-                BinaryUtils.mergeMetadata(locMeta, rmtMeta);
+                mergeMetadata(locMeta, rmtMeta);
             }
             catch (Exception e) {
                 String locMsg = "Exception was thrown when merging binary metadata from node %s: %s";
@@ -1361,7 +1353,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
                 BinaryMetadata newMeta = metaEntry.getValue().metadata();
                 BinaryMetadata localMeta = localMetaHolder.metadata();
 
-                BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(localMeta, newMeta);
+                BinaryMetadata mergedMeta = mergeMetadata(localMeta, newMeta);
 
                 if (mergedMeta != localMeta) {
                     //put mergedMeta to local cache and store to disk
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
index e031e4f..b49f395 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
@@ -103,7 +103,7 @@ public class BinaryTypeRegistrationTest extends GridCommonAbstractTest {
         exec.shutdown();
         exec.awaitTermination(10, TimeUnit.SECONDS);
 
-        assertEquals(1, metadataUpdateProposedMessages.size());
+        assertEquals(metadataUpdateProposedMessages.toString(), 1, metadataUpdateProposedMessages.size());
     }
 
     /**