You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 11:50:15 UTC
[46/50] [abbrv] ignite git commit: IGNITE-4302 - Use custom messages
to exchange binary metadata - Fixes #1655.
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
new file mode 100644
index 0000000..87bf805
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * As {@link DiscoveryCustomMessage} messages are delivered to client nodes asynchronously
+ * it is possible that server nodes are allowed to send to clients some BinaryObjects clients don't have metadata for.
+ *
+ * When client detects obsolete metadata (by checking if current version of metadata has schemaId)
+ * it requests up-to-date metadata using communication SPI.
+ *
+ * API to make a request is provided by {@link BinaryMetadataTransport#requestUpToDateMetadata(int)} method.
+ */
+public class MetadataRequestMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /**
+ * Default constructor.
+ */
+ public MetadataRequestMessage() {
+ //No-op.
+ }
+
+ /**
+ * @param typeId Type ID.
+ */
+ MetadataRequestMessage(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(MetadataRequestMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 80;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ //No-op.
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataRequestMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
new file mode 100644
index 0000000..bb3d3a3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Carries latest version of metadata to client as a response for {@link MetadataRequestMessage}.
+ */
+public class MetadataResponseMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /** */
+ private byte[] binaryMetadataBytes;
+
+ /** */
+ private ClientResponseStatus status;
+
+ /** */
+ public MetadataResponseMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param typeId Type id.
+ */
+ MetadataResponseMessage(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("status", status.ordinal()))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeByteArray("binMetaBytes", binaryMetadataBytes))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ status = ClientResponseStatus.values()[reader.readInt("status")];
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ binaryMetadataBytes = reader.readByteArray("binMetaBytes");
+
+ if (!reader.isLastRead())
+ return false;
+ }
+
+ return reader.afterMessageRead(MetadataResponseMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 81;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+
+ }
+
+ /**
+ * @param bytes Binary metadata bytes.
+ */
+ void binaryMetadataBytes(byte[] bytes) {
+ if (bytes != null)
+ status = ClientResponseStatus.METADATA_FOUND;
+ else
+ status = ClientResponseStatus.METADATA_NOT_FOUND;
+
+ binaryMetadataBytes = bytes;
+ }
+
+ /**
+ * Marks message if any exception happened during preparing response.
+ */
+ void markErrorOnRequest() {
+ status = ClientResponseStatus.ERROR;
+ }
+
+ /**
+ * @return Type ID.
+ */
+ int typeId() {
+ return typeId;
+ }
+
+ /**
+ * @return Marshalled BinaryMetadata.
+ */
+ byte[] binaryMetadataBytes() {
+ return binaryMetadataBytes;
+ }
+
+ /**
+ * @return {@code true} if metadata was not found on server node replied with the response.
+ */
+ boolean metadataNotFound() {
+ return status == ClientResponseStatus.METADATA_NOT_FOUND;
+ }
+
+ /**
+ * Response statuses enum.
+ */
+ private enum ClientResponseStatus {
+ /** */
+ METADATA_FOUND,
+
+ /** */
+ METADATA_NOT_FOUND,
+
+ /** */
+ ERROR
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataResponseMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
new file mode 100644
index 0000000..ef5370e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Acknowledge message for {@link MetadataUpdateProposedMessage}: see its javadoc for detailed description of protocol.
+ *
+ * As discovery messaging doesn't guarantee that message makes only one pass across the cluster
+ * <b>MetadataUpdateAcceptedMessage</b> enables to mark it as duplicated so other nodes won't process it but skip.
+ */
+public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private final int acceptedVer;
+
+ /** */
+ private boolean duplicated;
+
+ /**
+ * @param typeId Type id.
+ * @param acceptedVer Accepted version.
+ */
+ MetadataUpdateAcceptedMessage(int typeId, int acceptedVer) {
+ this.typeId = typeId;
+ this.acceptedVer = acceptedVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** */
+ int acceptedVersion() {
+ return acceptedVer;
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** */
+ public boolean duplicated() {
+ return duplicated;
+ }
+
+ /**
+ * @param duplicated duplicated flag.
+ */
+ public void duplicated(boolean duplicated) {
+ this.duplicated = duplicated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataUpdateAcceptedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
new file mode 100644
index 0000000..715e668
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryMetadataHandler;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * <b>MetadataUpdateProposedMessage</b> and {@link MetadataUpdateAcceptedMessage} messages make a basis for
+ * discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
+ *
+ * All interactions with binary metadata are performed through {@link BinaryMetadataHandler}
+ * interface implemented in {@link CacheObjectBinaryProcessorImpl} processor.
+ *
+ * Protocol works as follows:
+ * <ol>
+ * <li>
+ * Each thread aiming to add/update metadata sends <b>MetadataUpdateProposedMessage</b>
+ * and blocks until receiving acknowledge or reject for proposed update.
+ * </li>
+ * <li>
+ * Coordinator node checks whether proposed update is in conflict with current version of metadata
+ * for the same typeId.
+ * In case of conflict initial <b>MetadataUpdateProposedMessage</b> is marked rejected and sent to initiator.
+ * </li>
+ * <li>
+ * If there are no conflicts on coordinator, <b>pending version</b> for metadata of this typeId is bumped up by one;
+ * <b>MetadataUpdateProposedMessage</b> with <b>pending version</b> information is sent across the cluster.
+ * </li>
+ * <li>
+ * Each node on receiving non-rejected <b>MetadataUpdateProposedMessage</b> updates <b>pending version</b>
+ * for the typeId in metadata local cache.
+ * </li>
+ * <li>
+ * When <b>MetadataUpdateProposedMessage</b> finishes pass, {@link MetadataUpdateAcceptedMessage ack} is sent.
+ * Ack has the same <b>accepted version</b> as <b>pending version</b>
+ * of initial <b>MetadataUpdateProposedMessage</b> message.
+ * </li>
+ * <li>
+ * Each node on receiving <b>MetadataUpdateAcceptedMessage</b> updates accepted version for the typeId.
+ * All threads waiting for arrival of ack with this <b>accepted version</b> are unblocked.
+ * </li>
+ * </ol>
+ *
+ * If a thread on some node decides to read metadata which has ongoing update
+ * (with <b>pending version</b> strictly greater than <b>accepted version</b>)
+ * it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
+ * equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
+ */
+public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final UUID origNodeId;
+
+ /** */
+ private BinaryMetadata metadata;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private int pendingVer;
+
+ /** */
+ private int acceptedVer;
+
+ /** */
+ private ProposalStatus status = ProposalStatus.SUCCESSFUL;
+
+ /** */
+ private BinaryObjectException err;
+
+ /**
+ * @param metadata {@link BinaryMetadata} requested to be updated.
+ * @param origNodeId ID of node requested update.
+ */
+ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
+ assert origNodeId != null;
+ assert metadata != null;
+
+ this.origNodeId = origNodeId;
+
+ this.metadata = metadata;
+ typeId = metadata.typeId();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /**
+ * @param err Error caused this update to be rejected.
+ */
+ void markRejected(BinaryObjectException err) {
+ status = ProposalStatus.REJECTED;
+ this.err = err;
+ }
+
+ /**
+ *
+ */
+ boolean rejected() {
+ return status == ProposalStatus.REJECTED;
+ }
+
+ /**
+ *
+ */
+ BinaryObjectException rejectionError() {
+ return err;
+ }
+
+ /**
+ * @return Pending version.
+ */
+ int pendingVersion() {
+ return pendingVer;
+ }
+
+ /**
+ * @param pendingVer New pending version.
+ */
+ void pendingVersion(int pendingVer) {
+ this.pendingVer = pendingVer;
+ }
+
+ /**
+ *
+ */
+ int acceptedVersion() {
+ return acceptedVer;
+ }
+
+ /**
+ * @param acceptedVer Accepted version.
+ */
+ void acceptedVersion(int acceptedVer) {
+ this.acceptedVer = acceptedVer;
+ }
+
+ /**
+ *
+ */
+ UUID origNodeId() {
+ return origNodeId;
+ }
+
+ /**
+ *
+ */
+ public BinaryMetadata metadata() {
+ return metadata;
+ }
+
+ /**
+ * @param metadata Metadata.
+ */
+ public void metadata(BinaryMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+ /**
+ *
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** */
+ private enum ProposalStatus {
+ /** */
+ SUCCESSFUL,
+
+ /** */
+ REJECTED
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataUpdateProposedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
new file mode 100644
index 0000000..6c299ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.binary.BinaryObjectException;
+
+/**
+ * Represents result of metadata update or metadata read request (so it is used both by server and client nodes).
+ */
+final class MetadataUpdateResult {
+ /** */
+ private final ResultType resType;
+
+ /** */
+ private final BinaryObjectException error;
+
+ /**
+ * @param resType Response type.
+ * @param error Error.
+ */
+ private MetadataUpdateResult(ResultType resType, BinaryObjectException error) {
+ this.resType = resType;
+ this.error = error;
+ }
+
+ /**
+ *
+ */
+ boolean rejected() {
+ return resType == ResultType.REJECT;
+ }
+
+ /**
+ *
+ */
+ BinaryObjectException error() {
+ return error;
+ }
+
+ /**
+ *
+ */
+ static MetadataUpdateResult createSuccessfulResult() {
+ return new MetadataUpdateResult(ResultType.SUCCESS, null);
+ }
+
+ /**
+ * @param err Error lead to request failure.
+ */
+ static MetadataUpdateResult createFailureResult(BinaryObjectException err) {
+ assert err != null;
+
+ return new MetadataUpdateResult(ResultType.REJECT, err);
+ }
+
+ /**
+ *
+ */
+ static MetadataUpdateResult createUpdateDisabledResult() {
+ return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null);
+ }
+
+ /**
+ *
+ */
+ private enum ResultType {
+ /**
+ * If request completed successfully.
+ */
+ SUCCESS,
+
+ /**
+ * If request was rejected for any reason.
+ */
+ REJECT,
+
+ /**
+ * If request arrived at the moment when node has been stopping.
+ */
+ UPDATE_DISABLED
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 6c8df14..d65ddf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -138,7 +138,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private boolean locCache;
/** */
- private transient boolean keepBinary;
+ private boolean keepBinary;
/** */
private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
@@ -1389,6 +1389,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
out.writeBoolean(sync);
out.writeBoolean(ignoreExpired);
out.writeInt(taskHash);
+ out.writeBoolean(keepBinary);
}
/** {@inheritDoc} */
@@ -1410,6 +1411,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
+ keepBinary = in.readBoolean();
cacheId = CU.cacheId(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 66c19a0..804e889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -137,10 +136,10 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
/**
* Adds a listener to be notified when mapping changes.
*
- * @param mappingUpdatedListener listener for mapping updated events.
+ * @param lsnr listener for mapping updated events.
*/
- public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
- mappingUpdatedLsnrs.add(mappingUpdatedListener);
+ public void addMappingUpdatedListener(MappingUpdatedListener lsnr) {
+ mappingUpdatedLsnrs.add(lsnr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bf44723..d37eb9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -64,6 +64,8 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -1492,6 +1494,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
else
return;
+ if (msg instanceof MetadataUpdateProposedMessage || msg instanceof MetadataUpdateAcceptedMessage)
+ return;
+
topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
index e533a70..84e37ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
@@ -97,6 +97,8 @@ public class GridTaskExecutionSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testJobIdCollision() throws Exception {
+ fail("Test refactoring is needed: https://issues.apache.org/jira/browse/IGNITE-4706");
+
long locId = IgniteUuid.lastLocalId();
ArrayList<IgniteFuture<Object>> futs = new ArrayList<>(2016);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
index 0f48961..704c8f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
@@ -42,4 +42,9 @@ public class TestCachingMetadataHandler implements BinaryMetadataHandler {
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
return metas.get(typeId);
}
+
+ /** {@inheritDoc} */
+ @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
new file mode 100644
index 0000000..b76279d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
+ /** */
+ private static final String SEQ_NUM_FLD = "f0";
+
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private volatile boolean clientMode;
+
+ /** */
+ private volatile boolean applyDiscoveryHook;
+
+ /** */
+ private volatile DiscoveryHook discoveryHook;
+
+ /** */
+ private static final int UPDATES_COUNT = 5_000;
+
+ /** */
+ private static final int RESTART_DELAY = 3_000;
+
+ /** */
+ private final Queue<BinaryUpdateDescription> updatesQueue = new LinkedBlockingDeque<>(UPDATES_COUNT);
+
+ /** */
+ private static volatile BlockingDeque<Integer> srvResurrectQueue = new LinkedBlockingDeque<>(1);
+
+ /** */
+ private static final CountDownLatch START_LATCH = new CountDownLatch(1);
+
+ /** */
+ private static final CountDownLatch FINISH_LATCH_NO_CLIENTS = new CountDownLatch(5);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag0 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag1 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag2 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag3 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag4 = new AtomicBoolean(false);
+
+ /** */
+ private static final String BINARY_TYPE_NAME = "TestBinaryType";
+
+ /** */
+ private static final int BINARY_TYPE_ID = 708045005;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ for (int i = 0; i < UPDATES_COUNT; i++) {
+ FieldType fType = null;
+ switch (i % 4) {
+ case 0:
+ fType = FieldType.NUMBER;
+ break;
+ case 1:
+ fType = FieldType.STRING;
+ break;
+ case 2:
+ fType = FieldType.ARRAY;
+ break;
+ case 3:
+ fType = FieldType.OBJECT;
+ }
+
+ updatesQueue.add(new BinaryUpdateDescription(i, "f" + (i + 1), fType));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ if (applyDiscoveryHook) {
+ final DiscoveryHook hook = discoveryHook != null ? discoveryHook : new DiscoveryHook();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi() {
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(GridTestUtils.DiscoverySpiListenerWrapper.wrap(lsnr, hook));
+ }
+ };
+
+ discoSpi.setHeartbeatFrequency(1000);
+
+ cfg.setDiscoverySpi(discoSpi);
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setClientMode(clientMode);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Starts new ignite node and submits computation job to it.
+ * @param idx Index.
+ * @param stopFlag Stop flag.
+ */
+ private void startComputation(int idx, AtomicBoolean stopFlag) throws Exception {
+ clientMode = false;
+
+ final IgniteEx ignite0 = startGrid(idx);
+
+ ClusterGroup cg = ignite0.cluster().forNodeId(ignite0.localNode().id());
+
+ ignite0.compute(cg).withAsync().call(new BinaryObjectAdder(ignite0, updatesQueue, 30, stopFlag));
+ }
+
+ /**
+ * @param idx Index.
+ * @param deafClient Deaf client.
+ * @param observedIds Observed ids.
+ */
+ private void startListening(int idx, boolean deafClient, Set<Integer> observedIds) throws Exception {
+ clientMode = true;
+
+ ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setLocalListener(new CQListener(observedIds));
+
+ if (deafClient) {
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateProposedMessage) {
+ if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ }
+ };
+
+ IgniteEx client = startGrid(idx);
+
+ client.cache(null).withKeepBinary().query(qry);
+ }
+ else {
+ applyDiscoveryHook = false;
+
+ IgniteEx client = startGrid(idx);
+
+ client.cache(null).withKeepBinary().query(qry);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CQListener implements CacheEntryUpdatedListener {
+ /** */
+ private final Set<Integer> observedIds;
+
+ /**
+ * @param observedIds
+ */
+ CQListener(Set<Integer> observedIds) {
+ this.observedIds = observedIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object o : iterable) {
+ if (o instanceof CacheQueryEntryEvent) {
+ CacheQueryEntryEvent e = (CacheQueryEntryEvent) o;
+
+ BinaryObjectImpl val = (BinaryObjectImpl) e.getValue();
+
+ Integer seqNum = val.field(SEQ_NUM_FLD);
+
+ observedIds.add(seqNum);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void testFlowNoConflicts() throws Exception {
+ startComputation(0, stopFlag0);
+
+ startComputation(1, stopFlag1);
+
+ startComputation(2, stopFlag2);
+
+ startComputation(3, stopFlag3);
+
+ startComputation(4, stopFlag4);
+
+ Thread killer = new Thread(new ServerNodeKiller());
+ Thread resurrection = new Thread(new ServerNodeResurrection());
+ killer.setName("node-killer-thread");
+ killer.start();
+ resurrection.setName("node-resurrection-thread");
+ resurrection.start();
+
+ START_LATCH.countDown();
+
+ while (!updatesQueue.isEmpty())
+ Thread.sleep(1000);
+
+ FINISH_LATCH_NO_CLIENTS.await();
+
+ IgniteEx ignite0 = grid(0);
+
+ IgniteCache<Object, Object> cache0 = ignite0.cache(null);
+
+ int cacheEntries = cache0.size(CachePeekMode.PRIMARY);
+
+ assertTrue("Cache cannot contain more entries than were put in it;", cacheEntries <= UPDATES_COUNT);
+
+ assertEquals("There are less than expected entries, data loss occurred;", UPDATES_COUNT, cacheEntries);
+
+ killer.interrupt();
+ resurrection.interrupt();
+ }
+
+ /**
+ *
+ */
+ public void testFlowNoConflictsWithClients() throws Exception {
+ startComputation(0, stopFlag0);
+
+ startComputation(1, stopFlag1);
+
+ startComputation(2, stopFlag2);
+
+ startComputation(3, stopFlag3);
+
+ startComputation(4, stopFlag4);
+
+ final Set<Integer> deafClientObservedIds = new ConcurrentHashSet<>();
+
+ startListening(5, true, deafClientObservedIds);
+
+ final Set<Integer> regClientObservedIds = new ConcurrentHashSet<>();
+
+ startListening(6, false, regClientObservedIds);
+
+ START_LATCH.countDown();
+
+ Thread killer = new Thread(new ServerNodeKiller());
+ Thread resurrection = new Thread(new ServerNodeResurrection());
+ killer.setName("node-killer-thread");
+ killer.start();
+ resurrection.setName("node-resurrection-thread");
+ resurrection.start();
+
+ while (!updatesQueue.isEmpty())
+ Thread.sleep(1000);
+
+ killer.interrupt();
+ resurrection.interrupt();
+ }
+
+ /**
+ * Runnable responsible for stopping (gracefully) server nodes during metadata updates process.
+ */
+ private final class ServerNodeKiller implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Thread curr = Thread.currentThread();
+ try {
+ START_LATCH.await();
+
+ while (!curr.isInterrupted()) {
+ int idx = ThreadLocalRandom.current().nextInt(5);
+
+ AtomicBoolean stopFlag;
+
+ switch (idx) {
+ case 0:
+ stopFlag = stopFlag0;
+ break;
+ case 1:
+ stopFlag = stopFlag1;
+ break;
+ case 2:
+ stopFlag = stopFlag2;
+ break;
+ case 3:
+ stopFlag = stopFlag3;
+ break;
+ default:
+ stopFlag = stopFlag4;
+ }
+
+ stopFlag.set(true);
+
+ while (stopFlag.get())
+ Thread.sleep(10);
+
+ stopGrid(idx);
+
+ srvResurrectQueue.put(idx);
+
+ Thread.sleep(RESTART_DELAY);
+ }
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * {@link Runnable} object to restart nodes killed by {@link ServerNodeKiller}.
+ */
+ private final class ServerNodeResurrection implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Thread curr = Thread.currentThread();
+
+ try {
+ START_LATCH.await();
+
+ while (!curr.isInterrupted()) {
+ Integer idx = srvResurrectQueue.takeFirst();
+
+ AtomicBoolean stopFlag;
+
+ switch (idx) {
+ case 0:
+ stopFlag = stopFlag0;
+ break;
+ case 1:
+ stopFlag = stopFlag1;
+ break;
+ case 2:
+ stopFlag = stopFlag2;
+ break;
+ case 3:
+ stopFlag = stopFlag3;
+ break;
+ default:
+ stopFlag = stopFlag4;
+ }
+
+ clientMode = false;
+ applyDiscoveryHook = false;
+
+ startComputation(idx, stopFlag);
+ }
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Instruction for node to perform <b>add new binary object</b> action on cache in <b>keepBinary</b> mode.
+ *
+ * Instruction includes id the object should be added under, new field to add to binary schema
+ * and {@link FieldType type} of the field.
+ */
+ private static final class BinaryUpdateDescription {
+ /** */
+ private int itemId;
+
+ /** */
+ private String fieldName;
+
+ /** */
+ private FieldType fieldType;
+
+ /**
+ * @param itemId Item id.
+ * @param fieldName Field name.
+ * @param fieldType Field type.
+ */
+ private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType) {
+ this.itemId = itemId;
+ this.fieldName = fieldName;
+ this.fieldType = fieldType;
+ }
+ }
+
+ /**
+ *
+ */
+ private enum FieldType {
+ /** */
+ NUMBER,
+
+ /** */
+ STRING,
+
+ /** */
+ ARRAY,
+
+ /** */
+ OBJECT
+ }
+
+ /**
+ * Generates random number to use when creating binary object with field of numeric {@link FieldType type}.
+ */
+ private static int getNumberFieldVal() {
+ return ThreadLocalRandom.current().nextInt(100);
+ }
+
+ /**
+ * Generates random string to use when creating binary object with field of string {@link FieldType type}.
+ */
+ private static String getStringFieldVal() {
+ return "str" + (100 + ThreadLocalRandom.current().nextInt(9));
+ }
+
+ /**
+ * Generates random array to use when creating binary object with field of array {@link FieldType type}.
+ */
+ private static byte[] getArrayFieldVal() {
+ byte[] res = new byte[3];
+ ThreadLocalRandom.current().nextBytes(res);
+ return res;
+ }
+
+ /**
+ * @param builder Builder.
+ * @param desc Descriptor with parameters of BinaryObject to build.
+ * @return BinaryObject built by provided description
+ */
+ private static BinaryObject newBinaryObject(BinaryObjectBuilder builder, BinaryUpdateDescription desc) {
+ builder.setField(SEQ_NUM_FLD, desc.itemId + 1);
+
+ switch (desc.fieldType) {
+ case NUMBER:
+ builder.setField(desc.fieldName, getNumberFieldVal());
+ break;
+ case STRING:
+ builder.setField(desc.fieldName, getStringFieldVal());
+ break;
+ case ARRAY:
+ builder.setField(desc.fieldName, getArrayFieldVal());
+ break;
+ case OBJECT:
+ builder.setField(desc.fieldName, new Object());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Compute job executed on each node in cluster which constantly adds new entries to ignite cache
+ * according to {@link BinaryUpdateDescription descriptions} it reads from shared queue.
+ */
+ private static final class BinaryObjectAdder implements IgniteCallable<Object> {
+ /** */
+ private final IgniteEx ignite;
+
+ /** */
+ private final Queue<BinaryUpdateDescription> updatesQueue;
+
+ /** */
+ private final long timeout;
+
+ /** */
+ private final AtomicBoolean stopFlag;
+
+ /**
+ * @param ignite Ignite.
+ * @param updatesQueue Updates queue.
+ * @param timeout Timeout.
+ * @param stopFlag Stop flag.
+ */
+ BinaryObjectAdder(IgniteEx ignite, Queue<BinaryUpdateDescription> updatesQueue, long timeout, AtomicBoolean stopFlag) {
+ this.ignite = ignite;
+ this.updatesQueue = updatesQueue;
+ this.timeout = timeout;
+ this.stopFlag = stopFlag;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ START_LATCH.await();
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ while (!updatesQueue.isEmpty()) {
+ BinaryUpdateDescription desc = updatesQueue.poll();
+
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ BinaryObject bo = newBinaryObject(builder, desc);
+
+ cache.put(desc.itemId, bo);
+
+ if (stopFlag.get())
+ break;
+ else
+ Thread.sleep(timeout);
+ }
+
+ if (updatesQueue.isEmpty())
+ FINISH_LATCH_NO_CLIENTS.countDown();
+
+ stopFlag.set(false);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
new file mode 100644
index 0000000..2dcfab8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.AssertionFailedError;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
+import org.apache.ignite.testframework.GridTestUtils.DiscoverySpiListenerWrapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean clientMode;
+
+ /** */
+ private boolean applyDiscoveryHook;
+
+ /** */
+ private DiscoveryHook discoveryHook;
+
+ /** */
+ private static final String BINARY_TYPE_NAME = "TestBinaryType";
+
+ /** */
+ private static final int BINARY_TYPE_ID = 708045005;
+
+ /** */
+ private static final AtomicInteger metadataReqsCounter = new AtomicInteger(0);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (applyDiscoveryHook) {
+ final DiscoveryHook hook = discoveryHook != null ? discoveryHook : new DiscoveryHook();
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(DiscoverySpiListenerWrapper.wrap(lsnr, hook));
+ }
+ });
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setClientMode(clientMode);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static final class ErrorHolder {
+ /** */
+ private volatile Error e;
+
+ /**
+ * @param e Exception.
+ */
+ void error(Error e) {
+ this.e = e;
+ }
+
+ /**
+ *
+ */
+ void fail() {
+ throw e;
+ }
+
+ /**
+ *
+ */
+ boolean isEmpty() {
+ return e == null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ private static final CountDownLatch LATCH1 = new CountDownLatch(1);
+
+ /**
+ * Verifies that if thread tries to read metadata with ongoing update it gets blocked
+ * until acknowledge message arrives.
+ */
+ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage)customMsg).typeId() == BINARY_TYPE_ID)
+ try {
+ Thread.sleep(300);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ }
+ }
+ };
+
+ final IgniteEx ignite0 = startGrid(0);
+
+ applyDiscoveryHook = false;
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ final ErrorHolder errorHolder = new ErrorHolder();
+
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ private volatile IgniteEx ignite;
+
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ MetadataUpdateAcceptedMessage acceptedMsg = (MetadataUpdateAcceptedMessage)customMsg;
+ if (acceptedMsg.typeId() == BINARY_TYPE_ID && acceptedMsg.acceptedVersion() == 2) {
+ Object binaryProc = U.field(ignite.context(), "cacheObjProc");
+ Object transport = U.field(binaryProc, "transport");
+
+ try {
+ Map syncMap = U.field(transport, "syncMap");
+
+ int size = syncMap.size();
+ assertEquals("unexpected size of syncMap: ", 1, size);
+
+ Object syncKey = syncMap.keySet().iterator().next();
+
+ int typeId = U.field(syncKey, "typeId");
+ assertEquals("unexpected typeId: ", BINARY_TYPE_ID, typeId);
+
+ int ver = U.field(syncKey, "ver");
+ assertEquals("unexpected pendingVersion: ", 2, ver);
+ }
+ catch (AssertionFailedError err) {
+ errorHolder.error(err);
+ }
+ }
+ }
+ }
+
+ @Override public void ignite(IgniteEx ignite) {
+ this.ignite = ignite;
+ }
+ };
+
+ final IgniteEx ignite2 = startGrid(2);
+ discoveryHook.ignite(ignite2);
+
+ ignite0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite0, "f1", 101, 1);
+ }
+ }).get();
+
+ UUID id2 = ignite2.localNode().id();
+
+ ClusterGroup cg2 = ignite2.cluster().forNodeId(id2);
+
+ Future<?> fut = ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ LATCH1.countDown();
+ addStringField(ignite1, "f2", "str", 2);
+ }
+ });
+
+ ignite2.compute(cg2).withAsync().call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ LATCH1.await();
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ Object fieldVal = ((BinaryObject) ignite2.cache(null).withKeepBinary().get(1)).field("f1");
+
+ return fieldVal;
+ }
+ });
+
+ fut.get();
+
+ if (!errorHolder.isEmpty())
+ errorHolder.fail();
+ }
+
+ /**
+ * Verifies that all sequential updates that don't introduce any conflicts are accepted and observed by all nodes.
+ */
+ public void testSequentialUpdatesNoConflicts() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ final String intFieldName = "f1";
+
+ ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite1, intFieldName, 101, 1);
+ }
+ }).get();
+
+ int fld = ((BinaryObject) ignite0.cache(null).withKeepBinary().get(1)).field(intFieldName);
+
+ assertEquals(fld, 101);
+
+ final IgniteEx ignite2 = startGrid(2);
+
+ final String strFieldName = "f2";
+
+ ignite2.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(ignite2, strFieldName, "str", 2);
+ }
+ }).get();
+
+ assertEquals(((BinaryObject)ignite1.cache(null).withKeepBinary().get(2)).field(strFieldName), "str");
+ }
+
+ /**
+ * Verifies that client is able to detect obsolete metadata situation and request up-to-date from the cluster.
+ */
+ public void testClientRequestsUpToDateMetadata() throws Exception {
+ final IgniteEx ignite0 = startGrid(0);
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ ignite0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite0, "f1", 101, 1);
+ }
+ }).get();
+
+ final Ignite client = startDeafClient("client");
+
+ ClusterGroup clientGrp = client.cluster().forClients();
+
+ final String strVal = "strVal101";
+
+ ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(ignite1, "f2", strVal, 1);
+ }
+ }).get();
+
+ String res = client.compute(clientGrp).call(new IgniteCallable<String>() {
+ @Override public String call() throws Exception {
+ return ((BinaryObject)client.cache(null).withKeepBinary().get(1)).field("f2");
+ }
+ });
+
+ assertEquals(strVal, res);
+ }
+
+ /**
+ * Verifies that client resends request for up-to-date metadata in case of failure on server received first request.
+ */
+ public void testClientRequestsUpToDateMetadataOneNodeDies() throws Exception {
+ final Ignite srv0 = startGrid(0);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv0, "ctx")).io(), 0);
+
+ final Ignite srv1 = startGrid(1);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io());
+
+ final Ignite srv2 = startGrid(2);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io());
+
+ final Ignite client = startDeafClient("client");
+
+ ClusterGroup clientGrp = client.cluster().forClients();
+
+ srv0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(srv0, "f2", "strVal101", 0);
+ }
+ }).get();
+
+ client.compute(clientGrp).call(new IgniteCallable<String>() {
+ @Override public String call() throws Exception {
+ return ((BinaryObject)client.cache(null).withKeepBinary().get(0)).field("f2");
+ }
+ });
+
+ assertEquals(metadataReqsCounter.get(), 2);
+ }
+
+ /**
+ * Starts client node that skips <b>MetadataUpdateProposedMessage</b> and <b>MetadataUpdateAcceptedMessage</b>
+ * messages.
+ *
+ * @param clientName name of client node.
+ */
+ private Ignite startDeafClient(String clientName) throws Exception {
+ clientMode = true;
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateProposedMessage) {
+ if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ }
+ };
+
+ Ignite client = startGrid(clientName);
+
+ clientMode = false;
+ applyDiscoveryHook = false;
+
+ return client;
+ }
+
+ /**
+ *
+ */
+ private void replaceWithStoppingMappingRequestListener(GridIoManager ioMgr, final int nodeIdToStop) {
+ ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
+
+ ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ new Thread(new Runnable() {
+ @Override public void run() {
+ metadataReqsCounter.incrementAndGet();
+ stopGrid(nodeIdToStop, true);
+ }
+ }).start();
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ private void replaceWithCountingMappingRequestListener(GridIoManager ioMgr) {
+ GridMessageListener[] lsnrs = U.field(ioMgr, "sysLsnrs");
+
+ final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
+
+ GridMessageListener wrapper = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ metadataReqsCounter.incrementAndGet();
+ delegate.onMessage(nodeId, msg);
+ }
+ };
+
+ lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()] = wrapper;
+ }
+
+ /**
+ * Adds field of integer type to fixed binary type.
+ *
+ * @param ignite Ignite.
+ * @param fieldName Field name.
+ * @param fieldVal Field value.
+ * @param cacheIdx Cache index.
+ */
+ private void addIntField(Ignite ignite, String fieldName, int fieldVal, int cacheIdx) {
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ builder.setField(fieldName, fieldVal);
+
+ cache.put(cacheIdx, builder.build());
+ }
+
+ /**
+ * Adds field of String type to fixed binary type.
+ *
+ * @param ignite Ignite.
+ * @param fieldName Field name.
+ * @param fieldVal Field value.
+ * @param cacheIdx Cache index.
+ */
+ private void addStringField(Ignite ignite, String fieldName, String fieldVal, int cacheIdx) {
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ builder.setField(fieldName, fieldVal);
+
+ cache.put(cacheIdx, builder.build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 91220b2..475bf8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -96,6 +96,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -110,6 +112,61 @@ public final class GridTestUtils {
/** Default busy wait sleep interval in milliseconds. */
public static final long DFLT_BUSYWAIT_SLEEP_INTERVAL = 200;
+
+
+ /**
+ * Hook object intervenes to discovery message handling
+ * and thus allows to make assertions or other actions like skipping certain discovery messages.
+ */
+ public static class DiscoveryHook {
+ /**
+ * @param msg Message.
+ */
+ public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ public void ignite(IgniteEx ignite) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Injects {@link DiscoveryHook} into handling logic.
+ */
+ public static final class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
+ /** */
+ private final DiscoverySpiListener delegate;
+
+ /** */
+ private final DiscoveryHook hook;
+
+ /**
+ * @param delegate Delegate.
+ * @param hook Hook.
+ */
+ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate, DiscoveryHook hook) {
+ this.hook = hook;
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
+ hook.handleDiscoveryMessage(spiCustomMsg);
+ delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
+ }
+
+ /**
+ * @param delegate Delegate.
+ * @param discoveryHook Discovery hook.
+ */
+ public static DiscoverySpiListener wrap(DiscoverySpiListener delegate, DiscoveryHook discoveryHook) {
+ return new DiscoverySpiListenerWrapper(delegate, discoveryHook);
+ }
+ }
+
/** */
private static final Map<Class<?>, String> addrs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index 1a348e0..a462f90 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompac
import org.apache.ignite.internal.binary.streams.BinaryHeapStreamByteOrderSelfTest;
import org.apache.ignite.internal.binary.streams.BinaryOffheapStreamByteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.BinaryObjectOffHeapUnswapTemporaryTest;
+import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataUpdatesFlowTest;
+import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryObjectMetadataExchangeMultinodeTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryObjectUserClassloaderSelfTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesDefaultMappersSelfTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest;
@@ -147,6 +149,8 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
suite.addTestSuite(GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest.class);
suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataTest.class);
+ suite.addTestSuite(GridCacheBinaryObjectMetadataExchangeMultinodeTest.class);
+ suite.addTestSuite(BinaryMetadataUpdatesFlowTest.class);
suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataMultinodeTest.class);
suite.addTestSuite(IgniteBinaryMetadataUpdateChangingTopologySelfTest.class);