You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2019/03/27 14:47:21 UTC
[ignite] branch master updated: IGNITE-11605 Recheck metadata
difference after put pending future - Fixes #6324.
This is an automated email from the ASF dual-hosted git repository.
agoncharuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ignite.git
The following commit(s) were added to refs/heads/master by this push:
new 30a3d23 IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324.
30a3d23 is described below
commit 30a3d2361ad93e61b1036d2b3ff916ece13db904
Author: Anton Kalashnikov <ka...@yandex.ru>
AuthorDate: Wed Mar 27 17:29:04 2019 +0300
IGNITE-11605 Recheck metadata difference after put pending future - Fixes #6324.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
---
.../cache/binary/BinaryMetadataTransport.java | 97 +++++++++++++++++-----
.../binary/CacheObjectBinaryProcessorImpl.java | 94 ++++++++++-----------
.../cache/BinaryTypeRegistrationTest.java | 2 +-
3 files changed, 121 insertions(+), 72 deletions(-)
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
index d2fe972..0d2f6f9 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -20,6 +20,7 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
@@ -37,7 +38,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.binary.BinaryMetadata;
-import org.apache.ignite.internal.binary.BinaryUtils;
import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -53,6 +53,7 @@ import org.jetbrains.annotations.Nullable;
import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
/**
@@ -163,27 +164,66 @@ final class BinaryMetadataTransport {
/**
* Sends request to cluster proposing update for given metadata.
*
- * @param metadata Metadata proposed for update.
+ * @param newMeta Metadata proposed for update.
* @return Future to wait for update result on.
*/
- GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) {
- MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(metadata.typeId());
+ GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata newMeta) {
+ int typeId = newMeta.typeId();
- if (log.isDebugEnabled())
- log.debug("Requesting metadata update for " + metadata.typeId() + "; caller thread is blocked on future "
- + resFut);
+ MetadataUpdateResultFuture resFut;
- MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(metadata.typeId(), resFut);
+ do {
+ BinaryMetadataHolder metaHolder = metaLocCache.get(typeId);
+
+ BinaryMetadata oldMeta = Optional.ofNullable(metaHolder)
+ .map(BinaryMetadataHolder::metadata)
+ .orElse(null);
+
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, null);
+
+ if (mergedMeta == oldMeta) {
+ if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+ return null;
+
+ return awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+ }
+
+ resFut = new MetadataUpdateResultFuture(typeId);
+ }
+ while (!putAndWaitPendingUpdate(typeId, resFut));
+
+ BinaryMetadataHolder metadataHolder = metaLocCache.get(typeId);
+
+ BinaryMetadata oldMeta = Optional.ofNullable(metadataHolder)
+ .map(BinaryMetadataHolder::metadata)
+ .orElse(null);
+
+ Set<Integer> changedSchemas = new LinkedHashSet<>();
+
+ //Ensure after putting pending future, metadata still has difference.
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta, changedSchemas);
+
+ if (mergedMeta == oldMeta) {
+ resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
- if(oldFut != null)
return null;
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Requesting metadata update [typeId=" + typeId +
+ ", typeName=" + mergedMeta.typeName() +
+ ", changedSchemas=" + changedSchemas +
+ ", holder=" + metadataHolder +
+ ", fut=" + resFut +
+ ']');
+ }
try {
synchronized (this) {
unlabeledFutures.add(resFut);
if (!stopping)
- discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, ctx.localNodeId()));
+ discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(mergedMeta, ctx.localNodeId()));
else
resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
}
@@ -199,6 +239,31 @@ final class BinaryMetadataTransport {
}
/**
+ * Put new update future and it are waiting pending future if it exists.
+ *
+ * @param typeId Type id.
+ * @param metaUpdateFut New metadata update future.
+ * @return {@code true} If given future put successfully.
+ */
+ private boolean putAndWaitPendingUpdate(int typeId, MetadataUpdateResultFuture metaUpdateFut) {
+ MetadataUpdateResultFuture oldFut = pendingTypeIdMap.putIfAbsent(typeId, metaUpdateFut);
+
+ if (oldFut != null) {
+ try {
+ oldFut.get();
+ }
+ catch (IgniteCheckedException ignore) {
+ //Stacktrace will be logged in thread which created this future.
+ log.warning("Pending update metadata process was failed. Trying to update to new metadata.");
+ }
+
+ return false;
+ }
+
+ return true;
+ }
+
+ /**
* Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster.
*
* @param typeId ID of binary type.
@@ -223,14 +288,6 @@ final class BinaryMetadataTransport {
}
/**
- * @param typeId Type id.
- * @return Pending meta update future.
- */
- GridFutureAdapter<MetadataUpdateResult> getPendingMetaUpdate(int typeId) {
- return pendingTypeIdMap.get(typeId);
- }
-
- /**
* Await specific schema update.
* @param typeId Type id.
* @param schemaId Schema id.
@@ -331,7 +388,7 @@ final class BinaryMetadataTransport {
try {
Set<Integer> changedSchemas = new LinkedHashSet<>();
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+ BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas);
if (log.isDebugEnabled())
log.debug("Versions are stamped on coordinator" +
@@ -410,7 +467,7 @@ final class BinaryMetadataTransport {
Set<Integer> changedSchemas = new LinkedHashSet<>();
try {
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata(), changedSchemas);
+ BinaryMetadata mergedMeta = mergeMetadata(locMeta, msg.metadata(), changedSchemas);
BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 3dc4b45..4cce520 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.processors.cache.binary;
+import javax.cache.CacheException;
import java.io.File;
import java.io.Serializable;
import java.math.BigDecimal;
@@ -26,14 +27,11 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import javax.cache.CacheException;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteClientDisconnectedException;
@@ -121,6 +119,7 @@ import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION
import static org.apache.ignite.IgniteSystemProperties.IGNITE_WAIT_SCHEMA_UPDATE;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
+import static org.apache.ignite.internal.binary.BinaryUtils.mergeMetadata;
/**
* Binary processor implementation.
@@ -223,9 +222,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(
- oldMeta, ((BinaryTypeImpl)newMeta).metadata()
- );
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());
if (oldMeta != mergedMeta)
metadataLocCache.put(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
@@ -536,52 +533,23 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
- try {
- GridFutureAdapter<MetadataUpdateResult> fut;
- Set<Integer> changedSchemas;
- BinaryMetadataHolder metaHolder;
-
- do {
- metaHolder = metadataLocCache.get(typeId);
-
- BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
-
- changedSchemas = new LinkedHashSet<>();
-
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0, changedSchemas);
-
- if (mergedMeta != oldMeta) {
- if (failIfUnregistered)
- throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
+ if (failIfUnregistered) {
+ failIfUnregistered(typeId, newMeta0);
- fut = transport.requestMetadataUpdate(mergedMeta);
- }
- else {
- if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
- return;
+ return;
+ }
- // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary.
- fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+ try {
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.requestMetadataUpdate(newMeta0);
- if (failIfUnregistered && !fut.isDone())
- throw new UnregisteredBinaryTypeException(typeId, fut);
+ if (fut == null) {
+ if (log.isDebugEnabled()) {
+ log.debug("Metadata update was skipped [typeId=" + typeId
+ + ", typeName=" + newMeta.typeName() + ']');
}
- if (fut == null) {
- GridFutureAdapter<MetadataUpdateResult> pending = transport.getPendingMetaUpdate(typeId);
-
- if (pending != null) {
- try {
- pending.get();
- }
- catch (IgniteCheckedException ignore) {
- //Stacktrace will be logged in thread which created this future.
- log.warning("Pending update metadata process was failed. Trying to update to new metadata.");
- }
- }
- }
+ return;
}
- while (fut == null);
long t0 = System.nanoTime();
@@ -592,9 +560,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
log.debug("Completed metadata update [typeId=" + typeId +
", typeName=" + newMeta.typeName() +
- ", changedSchemas=" + changedSchemas +
", waitTime=" + MILLISECONDS.convert(System.nanoTime() - t0, NANOSECONDS) + "ms" +
- ", holder=" + metaHolder +
", fut=" + fut +
", tx=" + CU.txString(tx) +
']');
@@ -618,6 +584,32 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
}
}
+ /**
+ * Throw specific exception if given binary metadata is unregistered.
+ *
+ * @param typeId Type id.
+ * @param newMeta0 Expected binary metadata.
+ */
+ private void failIfUnregistered(int typeId, BinaryMetadata newMeta0) {
+ BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
+
+ BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
+
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
+
+ if (mergedMeta != oldMeta)
+ throw new UnregisteredBinaryTypeException(typeId, mergedMeta);
+
+ if (metaHolder.pendingVersion() == metaHolder.acceptedVersion())
+ return;
+
+ // Metadata locally is up-to-date. Waiting for updating metadata in an entire cluster, if necessary.
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, metaHolder.pendingVersion());
+
+ if (!fut.isDone())
+ throw new UnregisteredBinaryTypeException(typeId, fut);
+ }
+
/** {@inheritDoc} */
@Override public void addMetaLocally(int typeId, BinaryType newMeta) throws BinaryObjectException {
assert newMeta != null;
@@ -630,7 +622,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
try {
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ BinaryMetadata mergedMeta = mergeMetadata(oldMeta, newMeta0);
if (!ctx.clientNode())
metadataFileStore.mergeAndWriteMetadata(mergedMeta);
@@ -1302,7 +1294,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
continue;
try {
- BinaryUtils.mergeMetadata(locMeta, rmtMeta);
+ mergeMetadata(locMeta, rmtMeta);
}
catch (Exception e) {
String locMsg = "Exception was thrown when merging binary metadata from node %s: %s";
@@ -1361,7 +1353,7 @@ public class CacheObjectBinaryProcessorImpl extends GridProcessorAdapter impleme
BinaryMetadata newMeta = metaEntry.getValue().metadata();
BinaryMetadata localMeta = localMetaHolder.metadata();
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(localMeta, newMeta);
+ BinaryMetadata mergedMeta = mergeMetadata(localMeta, newMeta);
if (mergedMeta != localMeta) {
//put mergedMeta to local cache and store to disk
diff --git a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
index e031e4f..b49f395 100644
--- a/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
+++ b/modules/indexing/src/test/java/org/apache/ignite/internal/processors/cache/BinaryTypeRegistrationTest.java
@@ -103,7 +103,7 @@ public class BinaryTypeRegistrationTest extends GridCommonAbstractTest {
exec.shutdown();
exec.awaitTermination(10, TimeUnit.SECONDS);
- assertEquals(1, metadataUpdateProposedMessages.size());
+ assertEquals(metadataUpdateProposedMessages.toString(), 1, metadataUpdateProposedMessages.size());
}
/**