You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/03/30 08:57:00 UTC
[1/2] ignite git commit: IGNITE-4302 - Use custom messages to
exchange binary metadata - Fixes #1655.
Repository: ignite
Updated Branches:
refs/heads/master f056f2e39 -> 44cf1d21a
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
new file mode 100644
index 0000000..87bf805
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataRequestMessage.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * As {@link DiscoveryCustomMessage} messages are delivered to client nodes asynchronously
+ * it is possible that server nodes are allowed to send to clients some BinaryObjects clients don't have metadata for.
+ *
+ * When client detects obsolete metadata (by checking if current version of metadata has schemaId)
+ * it requests up-to-date metadata using communication SPI.
+ *
+ * API to make a request is provided by {@link BinaryMetadataTransport#requestUpToDateMetadata(int)} method.
+ */
+public class MetadataRequestMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /**
+ * Default constructor.
+ */
+ public MetadataRequestMessage() {
+ //No-op.
+ }
+
+ /**
+ * @param typeId Type ID.
+ */
+ MetadataRequestMessage(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(MetadataRequestMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 80;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 1;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ //No-op.
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataRequestMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
new file mode 100644
index 0000000..bb3d3a3
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataResponseMessage.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Carries latest version of metadata to client as a response for {@link MetadataRequestMessage}.
+ */
+public class MetadataResponseMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private int typeId;
+
+ /** */
+ private byte[] binaryMetadataBytes;
+
+ /** */
+ private ClientResponseStatus status;
+
+ /** */
+ public MetadataResponseMessage() {
+ // No-op.
+ }
+
+ /**
+ * @param typeId Type id.
+ */
+ MetadataResponseMessage(int typeId) {
+ this.typeId = typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("status", status.ordinal()))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeByteArray("binMetaBytes", binaryMetadataBytes))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ status = ClientResponseStatus.values()[reader.readInt("status")];
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ binaryMetadataBytes = reader.readByteArray("binMetaBytes");
+
+ if (!reader.isLastRead())
+ return false;
+ }
+
+ return reader.afterMessageRead(MetadataResponseMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public short directType() {
+ return 81;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+
+ }
+
+ /**
+ * @param bytes Binary metadata bytes.
+ */
+ void binaryMetadataBytes(byte[] bytes) {
+ if (bytes != null)
+ status = ClientResponseStatus.METADATA_FOUND;
+ else
+ status = ClientResponseStatus.METADATA_NOT_FOUND;
+
+ binaryMetadataBytes = bytes;
+ }
+
+ /**
+ * Marks message if any exception happened during preparing response.
+ */
+ void markErrorOnRequest() {
+ status = ClientResponseStatus.ERROR;
+ }
+
+ /**
+ * @return Type ID.
+ */
+ int typeId() {
+ return typeId;
+ }
+
+ /**
+ * @return Marshalled BinaryMetadata.
+ */
+ byte[] binaryMetadataBytes() {
+ return binaryMetadataBytes;
+ }
+
+ /**
+ * @return {@code true} if metadata was not found on server node replied with the response.
+ */
+ boolean metadataNotFound() {
+ return status == ClientResponseStatus.METADATA_NOT_FOUND;
+ }
+
+ /**
+ * Response statuses enum.
+ */
+ private enum ClientResponseStatus {
+ /** */
+ METADATA_FOUND,
+
+ /** */
+ METADATA_NOT_FOUND,
+
+ /** */
+ ERROR
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataResponseMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
new file mode 100644
index 0000000..ef5370e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateAcceptedMessage.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Acknowledge message for {@link MetadataUpdateProposedMessage}: see its javadoc for detailed description of protocol.
+ *
+ * As discovery messaging doesn't guarantee that message makes only one pass across the cluster
+ * <b>MetadataUpdateAcceptedMessage</b> enables to mark it as duplicated so other nodes won't process it but skip.
+ */
+public class MetadataUpdateAcceptedMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private final int acceptedVer;
+
+ /** */
+ private boolean duplicated;
+
+ /**
+ * @param typeId Type id.
+ * @param acceptedVer Accepted version.
+ */
+ MetadataUpdateAcceptedMessage(int typeId, int acceptedVer) {
+ this.typeId = typeId;
+ this.acceptedVer = acceptedVer;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /** */
+ int acceptedVersion() {
+ return acceptedVer;
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** */
+ public boolean duplicated() {
+ return duplicated;
+ }
+
+ /**
+ * @param duplicated duplicated flag.
+ */
+ public void duplicated(boolean duplicated) {
+ this.duplicated = duplicated;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataUpdateAcceptedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
new file mode 100644
index 0000000..715e668
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateProposedMessage.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.UUID;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryMetadataHandler;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * <b>MetadataUpdateProposedMessage</b> and {@link MetadataUpdateAcceptedMessage} messages make a basis for
+ * discovery-based protocol for exchanging {@link BinaryMetadata metadata} describing objects in binary format stored in Ignite caches.
+ *
+ * All interactions with binary metadata are performed through {@link BinaryMetadataHandler}
+ * interface implemented in {@link CacheObjectBinaryProcessorImpl} processor.
+ *
+ * Protocol works as follows:
+ * <ol>
+ * <li>
+ * Each thread aiming to add/update metadata sends <b>MetadataUpdateProposedMessage</b>
+ * and blocks until receiving acknowledge or reject for proposed update.
+ * </li>
+ * <li>
+ * Coordinator node checks whether proposed update is in conflict with current version of metadata
+ * for the same typeId.
+ * In case of conflict initial <b>MetadataUpdateProposedMessage</b> is marked rejected and sent to initiator.
+ * </li>
+ * <li>
+ * If there are no conflicts on coordinator, <b>pending version</b> for metadata of this typeId is bumped up by one;
+ * <b>MetadataUpdateProposedMessage</b> with <b>pending version</b> information is sent across the cluster.
+ * </li>
+ * <li>
+ * Each node on receiving non-rejected <b>MetadataUpdateProposedMessage</b> updates <b>pending version</b>
+ * for the typeId in metadata local cache.
+ * </li>
+ * <li>
+ * When <b>MetadataUpdateProposedMessage</b> finishes pass, {@link MetadataUpdateAcceptedMessage ack} is sent.
+ * Ack has the same <b>accepted version</b> as <b>pending version</b>
+ * of initial <b>MetadataUpdateProposedMessage</b> message.
+ * </li>
+ * <li>
+ * Each node on receiving <b>MetadataUpdateAcceptedMessage</b> updates accepted version for the typeId.
+ * All threads waiting for arrival of ack with this <b>accepted version</b> are unblocked.
+ * </li>
+ * </ol>
+ *
+ * If a thread on some node decides to read metadata which has ongoing update
+ * (with <b>pending version</b> strictly greater than <b>accepted version</b>)
+ * it gets blocked until {@link MetadataUpdateAcceptedMessage} arrives with <b>accepted version</b>
+ * equals to <b>pending version</b> of this metadata to the moment when is was initially read by the thread.
+ */
+public final class MetadataUpdateProposedMessage implements DiscoveryCustomMessage {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final IgniteUuid id = IgniteUuid.randomUuid();
+
+ /** */
+ private final UUID origNodeId;
+
+ /** */
+ private BinaryMetadata metadata;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private int pendingVer;
+
+ /** */
+ private int acceptedVer;
+
+ /** */
+ private ProposalStatus status = ProposalStatus.SUCCESSFUL;
+
+ /** */
+ private BinaryObjectException err;
+
+ /**
+ * @param metadata {@link BinaryMetadata} requested to be updated.
+ * @param origNodeId ID of node requested update.
+ */
+ public MetadataUpdateProposedMessage(BinaryMetadata metadata, UUID origNodeId) {
+ assert origNodeId != null;
+ assert metadata != null;
+
+ this.origNodeId = origNodeId;
+
+ this.metadata = metadata;
+ typeId = metadata.typeId();
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public IgniteUuid id() {
+ return id;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Nullable @Override public DiscoveryCustomMessage ackMessage() {
+ return (status == ProposalStatus.SUCCESSFUL) ? new MetadataUpdateAcceptedMessage(typeId, pendingVer) : null;
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override public boolean isMutable() {
+ return true;
+ }
+
+ /**
+ * @param err Error caused this update to be rejected.
+ */
+ void markRejected(BinaryObjectException err) {
+ status = ProposalStatus.REJECTED;
+ this.err = err;
+ }
+
+ /**
+ *
+ */
+ boolean rejected() {
+ return status == ProposalStatus.REJECTED;
+ }
+
+ /**
+ *
+ */
+ BinaryObjectException rejectionError() {
+ return err;
+ }
+
+ /**
+ * @return Pending version.
+ */
+ int pendingVersion() {
+ return pendingVer;
+ }
+
+ /**
+ * @param pendingVer New pending version.
+ */
+ void pendingVersion(int pendingVer) {
+ this.pendingVer = pendingVer;
+ }
+
+ /**
+ *
+ */
+ int acceptedVersion() {
+ return acceptedVer;
+ }
+
+ /**
+ * @param acceptedVer Accepted version.
+ */
+ void acceptedVersion(int acceptedVer) {
+ this.acceptedVer = acceptedVer;
+ }
+
+ /**
+ *
+ */
+ UUID origNodeId() {
+ return origNodeId;
+ }
+
+ /**
+ *
+ */
+ public BinaryMetadata metadata() {
+ return metadata;
+ }
+
+ /**
+ * @param metadata Metadata.
+ */
+ public void metadata(BinaryMetadata metadata) {
+ this.metadata = metadata;
+ }
+
+ /**
+ *
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** */
+ private enum ProposalStatus {
+ /** */
+ SUCCESSFUL,
+
+ /** */
+ REJECTED
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MetadataUpdateProposedMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
new file mode 100644
index 0000000..6c299ab
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/MetadataUpdateResult.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.binary.BinaryObjectException;
+
+/**
+ * Represents result of metadata update or metadata read request (so it is used both by server and client nodes).
+ */
+final class MetadataUpdateResult {
+ /** */
+ private final ResultType resType;
+
+ /** */
+ private final BinaryObjectException error;
+
+ /**
+ * @param resType Response type.
+ * @param error Error.
+ */
+ private MetadataUpdateResult(ResultType resType, BinaryObjectException error) {
+ this.resType = resType;
+ this.error = error;
+ }
+
+ /**
+ *
+ */
+ boolean rejected() {
+ return resType == ResultType.REJECT;
+ }
+
+ /**
+ *
+ */
+ BinaryObjectException error() {
+ return error;
+ }
+
+ /**
+ *
+ */
+ static MetadataUpdateResult createSuccessfulResult() {
+ return new MetadataUpdateResult(ResultType.SUCCESS, null);
+ }
+
+ /**
+ * @param err Error lead to request failure.
+ */
+ static MetadataUpdateResult createFailureResult(BinaryObjectException err) {
+ assert err != null;
+
+ return new MetadataUpdateResult(ResultType.REJECT, err);
+ }
+
+ /**
+ *
+ */
+ static MetadataUpdateResult createUpdateDisabledResult() {
+ return new MetadataUpdateResult(ResultType.UPDATE_DISABLED, null);
+ }
+
+ /**
+ *
+ */
+ private enum ResultType {
+ /**
+ * If request completed successfully.
+ */
+ SUCCESS,
+
+ /**
+ * If request was rejected for any reason.
+ */
+ REJECT,
+
+ /**
+ * If request arrived at the moment when node has been stopping.
+ */
+ UPDATE_DISABLED
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
index 6c8df14..d65ddf5 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryHandler.java
@@ -138,7 +138,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
private boolean locCache;
/** */
- private transient boolean keepBinary;
+ private boolean keepBinary;
/** */
private transient ConcurrentMap<Integer, PartitionRecovery> rcvs;
@@ -1389,6 +1389,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
out.writeBoolean(sync);
out.writeBoolean(ignoreExpired);
out.writeInt(taskHash);
+ out.writeBoolean(keepBinary);
}
/** {@inheritDoc} */
@@ -1410,6 +1411,7 @@ public class CacheContinuousQueryHandler<K, V> implements GridContinuousHandler
sync = in.readBoolean();
ignoreExpired = in.readBoolean();
taskHash = in.readInt();
+ keepBinary = in.readBoolean();
cacheId = CU.cacheId(cacheName);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
index 66c19a0..804e889 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/GridMarshallerMappingProcessor.java
@@ -23,7 +23,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.ExecutorService;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.events.DiscoveryEvent;
@@ -137,10 +136,10 @@ public class GridMarshallerMappingProcessor extends GridProcessorAdapter {
/**
* Adds a listener to be notified when mapping changes.
*
- * @param mappingUpdatedListener listener for mapping updated events.
+ * @param lsnr listener for mapping updated events.
*/
- public void addMappingUpdatedListener(MappingUpdatedListener mappingUpdatedListener) {
- mappingUpdatedLsnrs.add(mappingUpdatedListener);
+ public void addMappingUpdatedListener(MappingUpdatedListener lsnr) {
+ mappingUpdatedLsnrs.add(lsnr);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index bf44723..d37eb9f 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -64,6 +64,8 @@ import org.apache.ignite.internal.processors.cache.CacheEntryImpl;
import org.apache.ignite.internal.processors.cache.CacheIteratorConverter;
import org.apache.ignite.internal.processors.cache.DynamicCacheChangeBatch;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateAcceptedMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataUpdateProposedMessage;
import org.apache.ignite.internal.processors.cache.distributed.near.GridNearTxLocal;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
@@ -1492,6 +1494,9 @@ public class GridServiceProcessor extends GridProcessorAdapter {
else
return;
+ if (msg instanceof MetadataUpdateProposedMessage || msg instanceof MetadataUpdateAcceptedMessage)
+ return;
+
topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
}
else
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
index e533a70..84e37ca 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/GridTaskExecutionSelfTest.java
@@ -97,6 +97,8 @@ public class GridTaskExecutionSelfTest extends GridCommonAbstractTest {
* @throws Exception If failed.
*/
public void testJobIdCollision() throws Exception {
+ fail("Test refactoring is needed: https://issues.apache.org/jira/browse/IGNITE-4706");
+
long locId = IgniteUuid.lastLocalId();
ArrayList<IgniteFuture<Object>> futs = new ArrayList<>(2016);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
index 0f48961..704c8f3 100644
--- a/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
+++ b/modules/core/src/test/java/org/apache/ignite/internal/binary/TestCachingMetadataHandler.java
@@ -42,4 +42,9 @@ public class TestCachingMetadataHandler implements BinaryMetadataHandler {
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
return metas.get(typeId);
}
+
+ /** {@inheritDoc} */
+ @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
new file mode 100644
index 0000000..b76279d
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatesFlowTest.java
@@ -0,0 +1,592 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.cache.event.CacheEntryListenerException;
+import javax.cache.event.CacheEntryUpdatedListener;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cache.CachePeekMode;
+import org.apache.ignite.cache.query.CacheQueryEntryEvent;
+import org.apache.ignite.cache.query.ContinuousQuery;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.binary.BinaryObjectImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.eclipse.jetty.util.ConcurrentHashSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class BinaryMetadataUpdatesFlowTest extends GridCommonAbstractTest {
+ /** */
+ private static final String SEQ_NUM_FLD = "f0";
+
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private volatile boolean clientMode;
+
+ /** */
+ private volatile boolean applyDiscoveryHook;
+
+ /** */
+ private volatile DiscoveryHook discoveryHook;
+
+ /** */
+ private static final int UPDATES_COUNT = 5_000;
+
+ /** */
+ private static final int RESTART_DELAY = 3_000;
+
+ /** */
+ private final Queue<BinaryUpdateDescription> updatesQueue = new LinkedBlockingDeque<>(UPDATES_COUNT);
+
+ /** */
+ private static volatile BlockingDeque<Integer> srvResurrectQueue = new LinkedBlockingDeque<>(1);
+
+ /** */
+ private static final CountDownLatch START_LATCH = new CountDownLatch(1);
+
+ /** */
+ private static final CountDownLatch FINISH_LATCH_NO_CLIENTS = new CountDownLatch(5);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag0 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag1 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag2 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag3 = new AtomicBoolean(false);
+
+ /** */
+ private static volatile AtomicBoolean stopFlag4 = new AtomicBoolean(false);
+
+ /** */
+ private static final String BINARY_TYPE_NAME = "TestBinaryType";
+
+ /** */
+ private static final int BINARY_TYPE_ID = 708045005;
+
+ /** {@inheritDoc} */
+ @Override protected void beforeTest() throws Exception {
+ for (int i = 0; i < UPDATES_COUNT; i++) {
+ FieldType fType = null;
+ switch (i % 4) {
+ case 0:
+ fType = FieldType.NUMBER;
+ break;
+ case 1:
+ fType = FieldType.STRING;
+ break;
+ case 2:
+ fType = FieldType.ARRAY;
+ break;
+ case 3:
+ fType = FieldType.OBJECT;
+ }
+
+ updatesQueue.add(new BinaryUpdateDescription(i, "f" + (i + 1), fType));
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ cfg.setPeerClassLoadingEnabled(false);
+
+ if (applyDiscoveryHook) {
+ final DiscoveryHook hook = discoveryHook != null ? discoveryHook : new DiscoveryHook();
+
+ TcpDiscoverySpi discoSpi = new TcpDiscoverySpi() {
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(GridTestUtils.DiscoverySpiListenerWrapper.wrap(lsnr, hook));
+ }
+ };
+
+ discoSpi.setHeartbeatFrequency(1000);
+
+ cfg.setDiscoverySpi(discoSpi);
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setClientMode(clientMode);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /**
+ * Starts new ignite node and submits computation job to it.
+ * @param idx Index.
+ * @param stopFlag Stop flag.
+ */
+ private void startComputation(int idx, AtomicBoolean stopFlag) throws Exception {
+ clientMode = false;
+
+ final IgniteEx ignite0 = startGrid(idx);
+
+ ClusterGroup cg = ignite0.cluster().forNodeId(ignite0.localNode().id());
+
+ ignite0.compute(cg).withAsync().call(new BinaryObjectAdder(ignite0, updatesQueue, 30, stopFlag));
+ }
+
+ /**
+ * @param idx Index.
+ * @param deafClient Deaf client.
+ * @param observedIds Observed ids.
+ */
+ private void startListening(int idx, boolean deafClient, Set<Integer> observedIds) throws Exception {
+ clientMode = true;
+
+ ContinuousQuery qry = new ContinuousQuery();
+
+ qry.setLocalListener(new CQListener(observedIds));
+
+ if (deafClient) {
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateProposedMessage) {
+ if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ }
+ };
+
+ IgniteEx client = startGrid(idx);
+
+ client.cache(null).withKeepBinary().query(qry);
+ }
+ else {
+ applyDiscoveryHook = false;
+
+ IgniteEx client = startGrid(idx);
+
+ client.cache(null).withKeepBinary().query(qry);
+ }
+ }
+
+ /**
+ *
+ */
+ private static class CQListener implements CacheEntryUpdatedListener {
+ /** */
+ private final Set<Integer> observedIds;
+
+ /**
+ * @param observedIds
+ */
+ CQListener(Set<Integer> observedIds) {
+ this.observedIds = observedIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
+ for (Object o : iterable) {
+ if (o instanceof CacheQueryEntryEvent) {
+ CacheQueryEntryEvent e = (CacheQueryEntryEvent) o;
+
+ BinaryObjectImpl val = (BinaryObjectImpl) e.getValue();
+
+ Integer seqNum = val.field(SEQ_NUM_FLD);
+
+ observedIds.add(seqNum);
+ }
+ }
+ }
+ }
+
+ /**
+ *
+ */
+ public void testFlowNoConflicts() throws Exception {
+ startComputation(0, stopFlag0);
+
+ startComputation(1, stopFlag1);
+
+ startComputation(2, stopFlag2);
+
+ startComputation(3, stopFlag3);
+
+ startComputation(4, stopFlag4);
+
+ Thread killer = new Thread(new ServerNodeKiller());
+ Thread resurrection = new Thread(new ServerNodeResurrection());
+ killer.setName("node-killer-thread");
+ killer.start();
+ resurrection.setName("node-resurrection-thread");
+ resurrection.start();
+
+ START_LATCH.countDown();
+
+ while (!updatesQueue.isEmpty())
+ Thread.sleep(1000);
+
+ FINISH_LATCH_NO_CLIENTS.await();
+
+ IgniteEx ignite0 = grid(0);
+
+ IgniteCache<Object, Object> cache0 = ignite0.cache(null);
+
+ int cacheEntries = cache0.size(CachePeekMode.PRIMARY);
+
+ assertTrue("Cache cannot contain more entries than were put in it;", cacheEntries <= UPDATES_COUNT);
+
+ assertEquals("There are less than expected entries, data loss occurred;", UPDATES_COUNT, cacheEntries);
+
+ killer.interrupt();
+ resurrection.interrupt();
+ }
+
+ /**
+ *
+ */
+ public void testFlowNoConflictsWithClients() throws Exception {
+ startComputation(0, stopFlag0);
+
+ startComputation(1, stopFlag1);
+
+ startComputation(2, stopFlag2);
+
+ startComputation(3, stopFlag3);
+
+ startComputation(4, stopFlag4);
+
+ final Set<Integer> deafClientObservedIds = new ConcurrentHashSet<>();
+
+ startListening(5, true, deafClientObservedIds);
+
+ final Set<Integer> regClientObservedIds = new ConcurrentHashSet<>();
+
+ startListening(6, false, regClientObservedIds);
+
+ START_LATCH.countDown();
+
+ Thread killer = new Thread(new ServerNodeKiller());
+ Thread resurrection = new Thread(new ServerNodeResurrection());
+ killer.setName("node-killer-thread");
+ killer.start();
+ resurrection.setName("node-resurrection-thread");
+ resurrection.start();
+
+ while (!updatesQueue.isEmpty())
+ Thread.sleep(1000);
+
+ killer.interrupt();
+ resurrection.interrupt();
+ }
+
+ /**
+ * Runnable responsible for stopping (gracefully) server nodes during metadata updates process.
+ */
+ private final class ServerNodeKiller implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Thread curr = Thread.currentThread();
+ try {
+ START_LATCH.await();
+
+ while (!curr.isInterrupted()) {
+ int idx = ThreadLocalRandom.current().nextInt(5);
+
+ AtomicBoolean stopFlag;
+
+ switch (idx) {
+ case 0:
+ stopFlag = stopFlag0;
+ break;
+ case 1:
+ stopFlag = stopFlag1;
+ break;
+ case 2:
+ stopFlag = stopFlag2;
+ break;
+ case 3:
+ stopFlag = stopFlag3;
+ break;
+ default:
+ stopFlag = stopFlag4;
+ }
+
+ stopFlag.set(true);
+
+ while (stopFlag.get())
+ Thread.sleep(10);
+
+ stopGrid(idx);
+
+ srvResurrectQueue.put(idx);
+
+ Thread.sleep(RESTART_DELAY);
+ }
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * {@link Runnable} object to restart nodes killed by {@link ServerNodeKiller}.
+ */
+ private final class ServerNodeResurrection implements Runnable {
+ /** {@inheritDoc} */
+ @Override public void run() {
+ Thread curr = Thread.currentThread();
+
+ try {
+ START_LATCH.await();
+
+ while (!curr.isInterrupted()) {
+ Integer idx = srvResurrectQueue.takeFirst();
+
+ AtomicBoolean stopFlag;
+
+ switch (idx) {
+ case 0:
+ stopFlag = stopFlag0;
+ break;
+ case 1:
+ stopFlag = stopFlag1;
+ break;
+ case 2:
+ stopFlag = stopFlag2;
+ break;
+ case 3:
+ stopFlag = stopFlag3;
+ break;
+ default:
+ stopFlag = stopFlag4;
+ }
+
+ clientMode = false;
+ applyDiscoveryHook = false;
+
+ startComputation(idx, stopFlag);
+ }
+ }
+ catch (Exception ignored) {
+ // No-op.
+ }
+ }
+ }
+
+ /**
+ * Instruction for node to perform <b>add new binary object</b> action on cache in <b>keepBinary</b> mode.
+ *
+ * Instruction includes id the object should be added under, new field to add to binary schema
+ * and {@link FieldType type} of the field.
+ */
+ private static final class BinaryUpdateDescription {
+ /** */
+ private int itemId;
+
+ /** */
+ private String fieldName;
+
+ /** */
+ private FieldType fieldType;
+
+ /**
+ * @param itemId Item id.
+ * @param fieldName Field name.
+ * @param fieldType Field type.
+ */
+ private BinaryUpdateDescription(int itemId, String fieldName, FieldType fieldType) {
+ this.itemId = itemId;
+ this.fieldName = fieldName;
+ this.fieldType = fieldType;
+ }
+ }
+
+ /**
+ *
+ */
+ private enum FieldType {
+ /** */
+ NUMBER,
+
+ /** */
+ STRING,
+
+ /** */
+ ARRAY,
+
+ /** */
+ OBJECT
+ }
+
+ /**
+ * Generates random number to use when creating binary object with field of numeric {@link FieldType type}.
+ */
+ private static int getNumberFieldVal() {
+ return ThreadLocalRandom.current().nextInt(100);
+ }
+
+ /**
+ * Generates random string to use when creating binary object with field of string {@link FieldType type}.
+ */
+ private static String getStringFieldVal() {
+ return "str" + (100 + ThreadLocalRandom.current().nextInt(9));
+ }
+
+ /**
+ * Generates random array to use when creating binary object with field of array {@link FieldType type}.
+ */
+ private static byte[] getArrayFieldVal() {
+ byte[] res = new byte[3];
+ ThreadLocalRandom.current().nextBytes(res);
+ return res;
+ }
+
+ /**
+ * @param builder Builder.
+ * @param desc Descriptor with parameters of BinaryObject to build.
+ * @return BinaryObject built by provided description
+ */
+ private static BinaryObject newBinaryObject(BinaryObjectBuilder builder, BinaryUpdateDescription desc) {
+ builder.setField(SEQ_NUM_FLD, desc.itemId + 1);
+
+ switch (desc.fieldType) {
+ case NUMBER:
+ builder.setField(desc.fieldName, getNumberFieldVal());
+ break;
+ case STRING:
+ builder.setField(desc.fieldName, getStringFieldVal());
+ break;
+ case ARRAY:
+ builder.setField(desc.fieldName, getArrayFieldVal());
+ break;
+ case OBJECT:
+ builder.setField(desc.fieldName, new Object());
+ }
+
+ return builder.build();
+ }
+
+ /**
+ * Compute job executed on each node in cluster which constantly adds new entries to ignite cache
+ * according to {@link BinaryUpdateDescription descriptions} it reads from shared queue.
+ */
+ private static final class BinaryObjectAdder implements IgniteCallable<Object> {
+ /** */
+ private final IgniteEx ignite;
+
+ /** */
+ private final Queue<BinaryUpdateDescription> updatesQueue;
+
+ /** */
+ private final long timeout;
+
+ /** */
+ private final AtomicBoolean stopFlag;
+
+ /**
+ * @param ignite Ignite.
+ * @param updatesQueue Updates queue.
+ * @param timeout Timeout.
+ * @param stopFlag Stop flag.
+ */
+ BinaryObjectAdder(IgniteEx ignite, Queue<BinaryUpdateDescription> updatesQueue, long timeout, AtomicBoolean stopFlag) {
+ this.ignite = ignite;
+ this.updatesQueue = updatesQueue;
+ this.timeout = timeout;
+ this.stopFlag = stopFlag;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object call() throws Exception {
+ START_LATCH.await();
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ while (!updatesQueue.isEmpty()) {
+ BinaryUpdateDescription desc = updatesQueue.poll();
+
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ BinaryObject bo = newBinaryObject(builder, desc);
+
+ cache.put(desc.itemId, bo);
+
+ if (stopFlag.get())
+ break;
+ else
+ Thread.sleep(timeout);
+ }
+
+ if (updatesQueue.isEmpty())
+ FINISH_LATCH_NO_CLIENTS.countDown();
+
+ stopFlag.set(false);
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
new file mode 100644
index 0000000..2dcfab8
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/cache/binary/GridCacheBinaryObjectMetadataExchangeMultinodeTest.java
@@ -0,0 +1,463 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicInteger;
+import junit.framework.AssertionFailedError;
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.binary.BinaryObject;
+import org.apache.ignite.binary.BinaryObjectBuilder;
+import org.apache.ignite.cache.CacheMode;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.binary.BinaryMarshaller;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.lang.IgniteCallable;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
+import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.GridTestUtils.DiscoveryHook;
+import org.apache.ignite.testframework.GridTestUtils.DiscoverySpiListenerWrapper;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ *
+ */
+public class GridCacheBinaryObjectMetadataExchangeMultinodeTest extends GridCommonAbstractTest {
+ /** */
+ protected static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
+
+ /** */
+ private boolean clientMode;
+
+ /** */
+ private boolean applyDiscoveryHook;
+
+ /** */
+ private DiscoveryHook discoveryHook;
+
+ /** */
+ private static final String BINARY_TYPE_NAME = "TestBinaryType";
+
+ /** */
+ private static final int BINARY_TYPE_ID = 708045005;
+
+ /** */
+ private static final AtomicInteger metadataReqsCounter = new AtomicInteger(0);
+
+ /** {@inheritDoc} */
+ @Override protected IgniteConfiguration getConfiguration(String gridName) throws Exception {
+ IgniteConfiguration cfg = super.getConfiguration(gridName);
+
+ if (applyDiscoveryHook) {
+ final DiscoveryHook hook = discoveryHook != null ? discoveryHook : new DiscoveryHook();
+
+ cfg.setDiscoverySpi(new TcpDiscoverySpi() {
+ @Override public void setListener(@Nullable DiscoverySpiListener lsnr) {
+ super.setListener(DiscoverySpiListenerWrapper.wrap(lsnr, hook));
+ }
+ });
+ }
+
+ ((TcpDiscoverySpi)cfg.getDiscoverySpi()).setIpFinder(ipFinder);
+
+ cfg.setMarshaller(new BinaryMarshaller());
+
+ cfg.setClientMode(clientMode);
+
+ CacheConfiguration ccfg = new CacheConfiguration();
+
+ ccfg.setCacheMode(CacheMode.REPLICATED);
+
+ cfg.setCacheConfiguration(ccfg);
+
+ return cfg;
+ }
+
+ /**
+ *
+ */
+ private static final class ErrorHolder {
+ /** */
+ private volatile Error e;
+
+ /**
+ * @param e Exception.
+ */
+ void error(Error e) {
+ this.e = e;
+ }
+
+ /**
+ *
+ */
+ void fail() {
+ throw e;
+ }
+
+ /**
+ *
+ */
+ boolean isEmpty() {
+ return e == null;
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected void afterTest() throws Exception {
+ super.afterTest();
+
+ stopAllGrids();
+ }
+
+ /** */
+ private static final CountDownLatch LATCH1 = new CountDownLatch(1);
+
+ /**
+ * Verifies that if thread tries to read metadata with ongoing update it gets blocked
+ * until acknowledge message arrives.
+ */
+ public void testReadRequestBlockedOnUpdatingMetadata() throws Exception {
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage)customMsg).typeId() == BINARY_TYPE_ID)
+ try {
+ Thread.sleep(300);
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+ }
+ }
+ };
+
+ final IgniteEx ignite0 = startGrid(0);
+
+ applyDiscoveryHook = false;
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ final ErrorHolder errorHolder = new ErrorHolder();
+
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ private volatile IgniteEx ignite;
+
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ MetadataUpdateAcceptedMessage acceptedMsg = (MetadataUpdateAcceptedMessage)customMsg;
+ if (acceptedMsg.typeId() == BINARY_TYPE_ID && acceptedMsg.acceptedVersion() == 2) {
+ Object binaryProc = U.field(ignite.context(), "cacheObjProc");
+ Object transport = U.field(binaryProc, "transport");
+
+ try {
+ Map syncMap = U.field(transport, "syncMap");
+
+ int size = syncMap.size();
+ assertEquals("unexpected size of syncMap: ", 1, size);
+
+ Object syncKey = syncMap.keySet().iterator().next();
+
+ int typeId = U.field(syncKey, "typeId");
+ assertEquals("unexpected typeId: ", BINARY_TYPE_ID, typeId);
+
+ int ver = U.field(syncKey, "ver");
+ assertEquals("unexpected pendingVersion: ", 2, ver);
+ }
+ catch (AssertionFailedError err) {
+ errorHolder.error(err);
+ }
+ }
+ }
+ }
+
+ @Override public void ignite(IgniteEx ignite) {
+ this.ignite = ignite;
+ }
+ };
+
+ final IgniteEx ignite2 = startGrid(2);
+ discoveryHook.ignite(ignite2);
+
+ ignite0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite0, "f1", 101, 1);
+ }
+ }).get();
+
+ UUID id2 = ignite2.localNode().id();
+
+ ClusterGroup cg2 = ignite2.cluster().forNodeId(id2);
+
+ Future<?> fut = ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ LATCH1.countDown();
+ addStringField(ignite1, "f2", "str", 2);
+ }
+ });
+
+ ignite2.compute(cg2).withAsync().call(new IgniteCallable<Object>() {
+ @Override public Object call() throws Exception {
+ try {
+ LATCH1.await();
+ }
+ catch (InterruptedException ignored) {
+ // No-op.
+ }
+
+ Object fieldVal = ((BinaryObject) ignite2.cache(null).withKeepBinary().get(1)).field("f1");
+
+ return fieldVal;
+ }
+ });
+
+ fut.get();
+
+ if (!errorHolder.isEmpty())
+ errorHolder.fail();
+ }
+
+ /**
+ * Verifies that all sequential updates that don't introduce any conflicts are accepted and observed by all nodes.
+ */
+ public void testSequentialUpdatesNoConflicts() throws Exception {
+ IgniteEx ignite0 = startGrid(0);
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ final String intFieldName = "f1";
+
+ ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite1, intFieldName, 101, 1);
+ }
+ }).get();
+
+ int fld = ((BinaryObject) ignite0.cache(null).withKeepBinary().get(1)).field(intFieldName);
+
+ assertEquals(fld, 101);
+
+ final IgniteEx ignite2 = startGrid(2);
+
+ final String strFieldName = "f2";
+
+ ignite2.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(ignite2, strFieldName, "str", 2);
+ }
+ }).get();
+
+ assertEquals(((BinaryObject)ignite1.cache(null).withKeepBinary().get(2)).field(strFieldName), "str");
+ }
+
+ /**
+ * Verifies that client is able to detect obsolete metadata situation and request up-to-date from the cluster.
+ */
+ public void testClientRequestsUpToDateMetadata() throws Exception {
+ final IgniteEx ignite0 = startGrid(0);
+
+ final IgniteEx ignite1 = startGrid(1);
+
+ ignite0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addIntField(ignite0, "f1", 101, 1);
+ }
+ }).get();
+
+ final Ignite client = startDeafClient("client");
+
+ ClusterGroup clientGrp = client.cluster().forClients();
+
+ final String strVal = "strVal101";
+
+ ignite1.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(ignite1, "f2", strVal, 1);
+ }
+ }).get();
+
+ String res = client.compute(clientGrp).call(new IgniteCallable<String>() {
+ @Override public String call() throws Exception {
+ return ((BinaryObject)client.cache(null).withKeepBinary().get(1)).field("f2");
+ }
+ });
+
+ assertEquals(strVal, res);
+ }
+
+ /**
+ * Verifies that client resends request for up-to-date metadata in case of failure on server received first request.
+ */
+ public void testClientRequestsUpToDateMetadataOneNodeDies() throws Exception {
+ final Ignite srv0 = startGrid(0);
+ replaceWithStoppingMappingRequestListener(((GridKernalContext)U.field(srv0, "ctx")).io(), 0);
+
+ final Ignite srv1 = startGrid(1);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv1, "ctx")).io());
+
+ final Ignite srv2 = startGrid(2);
+ replaceWithCountingMappingRequestListener(((GridKernalContext)U.field(srv2, "ctx")).io());
+
+ final Ignite client = startDeafClient("client");
+
+ ClusterGroup clientGrp = client.cluster().forClients();
+
+ srv0.executorService().submit(new Runnable() {
+ @Override public void run() {
+ addStringField(srv0, "f2", "strVal101", 0);
+ }
+ }).get();
+
+ client.compute(clientGrp).call(new IgniteCallable<String>() {
+ @Override public String call() throws Exception {
+ return ((BinaryObject)client.cache(null).withKeepBinary().get(0)).field("f2");
+ }
+ });
+
+ assertEquals(metadataReqsCounter.get(), 2);
+ }
+
+ /**
+ * Starts client node that skips <b>MetadataUpdateProposedMessage</b> and <b>MetadataUpdateAcceptedMessage</b>
+ * messages.
+ *
+ * @param clientName name of client node.
+ */
+ private Ignite startDeafClient(String clientName) throws Exception {
+ clientMode = true;
+ applyDiscoveryHook = true;
+ discoveryHook = new DiscoveryHook() {
+ @Override public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ DiscoveryCustomMessage customMsg = msg == null ? null
+ : (DiscoveryCustomMessage) IgniteUtils.field(msg, "delegate");
+
+ if (customMsg instanceof MetadataUpdateProposedMessage) {
+ if (((MetadataUpdateProposedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ else if (customMsg instanceof MetadataUpdateAcceptedMessage) {
+ if (((MetadataUpdateAcceptedMessage) customMsg).typeId() == BINARY_TYPE_ID)
+ GridTestUtils.setFieldValue(customMsg, "typeId", 1);
+ }
+ }
+ };
+
+ Ignite client = startGrid(clientName);
+
+ clientMode = false;
+ applyDiscoveryHook = false;
+
+ return client;
+ }
+
+ /**
+ *
+ */
+ private void replaceWithStoppingMappingRequestListener(GridIoManager ioMgr, final int nodeIdToStop) {
+ ioMgr.removeMessageListener(GridTopic.TOPIC_METADATA_REQ);
+
+ ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ new Thread(new Runnable() {
+ @Override public void run() {
+ metadataReqsCounter.incrementAndGet();
+ stopGrid(nodeIdToStop, true);
+ }
+ }).start();
+ }
+ });
+ }
+
+ /**
+ *
+ */
+ private void replaceWithCountingMappingRequestListener(GridIoManager ioMgr) {
+ GridMessageListener[] lsnrs = U.field(ioMgr, "sysLsnrs");
+
+ final GridMessageListener delegate = lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()];
+
+ GridMessageListener wrapper = new GridMessageListener() {
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ metadataReqsCounter.incrementAndGet();
+ delegate.onMessage(nodeId, msg);
+ }
+ };
+
+ lsnrs[GridTopic.TOPIC_METADATA_REQ.ordinal()] = wrapper;
+ }
+
+ /**
+ * Adds field of integer type to fixed binary type.
+ *
+ * @param ignite Ignite.
+ * @param fieldName Field name.
+ * @param fieldVal Field value.
+ * @param cacheIdx Cache index.
+ */
+ private void addIntField(Ignite ignite, String fieldName, int fieldVal, int cacheIdx) {
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ builder.setField(fieldName, fieldVal);
+
+ cache.put(cacheIdx, builder.build());
+ }
+
+ /**
+ * Adds field of String type to fixed binary type.
+ *
+ * @param ignite Ignite.
+ * @param fieldName Field name.
+ * @param fieldVal Field value.
+ * @param cacheIdx Cache index.
+ */
+ private void addStringField(Ignite ignite, String fieldName, String fieldVal, int cacheIdx) {
+ BinaryObjectBuilder builder = ignite.binary().builder(BINARY_TYPE_NAME);
+
+ IgniteCache<Object, Object> cache = ignite.cache(null).withKeepBinary();
+
+ builder.setField(fieldName, fieldVal);
+
+ cache.put(cacheIdx, builder.build());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
index 91220b2..475bf8e 100644
--- a/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
+++ b/modules/core/src/test/java/org/apache/ignite/testframework/GridTestUtils.java
@@ -96,6 +96,8 @@ import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteInClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
+import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.swapspace.inmemory.GridTestSwapSpaceSpi;
import org.apache.ignite.ssl.SslContextFactory;
import org.apache.ignite.testframework.config.GridTestProperties;
@@ -110,6 +112,61 @@ public final class GridTestUtils {
/** Default busy wait sleep interval in milliseconds. */
public static final long DFLT_BUSYWAIT_SLEEP_INTERVAL = 200;
+
+
+ /**
+ * Hook object intervenes to discovery message handling
+ * and thus allows to make assertions or other actions like skipping certain discovery messages.
+ */
+ public static class DiscoveryHook {
+ /**
+ * @param msg Message.
+ */
+ public void handleDiscoveryMessage(DiscoverySpiCustomMessage msg) {
+ }
+
+ /**
+ * @param ignite Ignite.
+ */
+ public void ignite(IgniteEx ignite) {
+ // No-op.
+ }
+ }
+
+ /**
+ * Injects {@link DiscoveryHook} into handling logic.
+ */
+ public static final class DiscoverySpiListenerWrapper implements DiscoverySpiListener {
+ /** */
+ private final DiscoverySpiListener delegate;
+
+ /** */
+ private final DiscoveryHook hook;
+
+ /**
+ * @param delegate Delegate.
+ * @param hook Hook.
+ */
+ private DiscoverySpiListenerWrapper(DiscoverySpiListener delegate, DiscoveryHook hook) {
+ this.hook = hook;
+ this.delegate = delegate;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscovery(int type, long topVer, ClusterNode node, Collection<ClusterNode> topSnapshot, @Nullable Map<Long, Collection<ClusterNode>> topHist, @Nullable DiscoverySpiCustomMessage spiCustomMsg) {
+ hook.handleDiscoveryMessage(spiCustomMsg);
+ delegate.onDiscovery(type, topVer, node, topSnapshot, topHist, spiCustomMsg);
+ }
+
+ /**
+ * @param delegate Delegate.
+ * @param discoveryHook Discovery hook.
+ */
+ public static DiscoverySpiListener wrap(DiscoverySpiListener delegate, DiscoveryHook discoveryHook) {
+ return new DiscoverySpiListenerWrapper(delegate, discoveryHook);
+ }
+ }
+
/** */
private static final Map<Class<?>, String> addrs = new HashMap<>();
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
index 1a348e0..a462f90 100644
--- a/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
+++ b/modules/core/src/test/java/org/apache/ignite/testsuites/IgniteBinaryObjectsTestSuite.java
@@ -54,6 +54,8 @@ import org.apache.ignite.internal.binary.noncompact.BinaryObjectBuilderNonCompac
import org.apache.ignite.internal.binary.streams.BinaryHeapStreamByteOrderSelfTest;
import org.apache.ignite.internal.binary.streams.BinaryOffheapStreamByteOrderSelfTest;
import org.apache.ignite.internal.processors.cache.BinaryObjectOffHeapUnswapTemporaryTest;
+import org.apache.ignite.internal.processors.cache.binary.BinaryMetadataUpdatesFlowTest;
+import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryObjectMetadataExchangeMultinodeTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryObjectUserClassloaderSelfTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesDefaultMappersSelfTest;
import org.apache.ignite.internal.processors.cache.binary.GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest;
@@ -147,6 +149,8 @@ public class IgniteBinaryObjectsTestSuite extends TestSuite {
suite.addTestSuite(GridCacheBinaryStoreBinariesSimpleNameMappersSelfTest.class);
suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataTest.class);
+ suite.addTestSuite(GridCacheBinaryObjectMetadataExchangeMultinodeTest.class);
+ suite.addTestSuite(BinaryMetadataUpdatesFlowTest.class);
suite.addTestSuite(GridCacheClientNodeBinaryObjectMetadataMultinodeTest.class);
suite.addTestSuite(IgniteBinaryMetadataUpdateChangingTopologySelfTest.class);
[2/2] ignite git commit: IGNITE-4302 - Use custom messages to
exchange binary metadata - Fixes #1655.
Posted by ag...@apache.org.
IGNITE-4302 - Use custom messages to exchange binary metadata - Fixes #1655.
Signed-off-by: Alexey Goncharuk <al...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/44cf1d21
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/44cf1d21
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/44cf1d21
Branch: refs/heads/master
Commit: 44cf1d21ad4363041043ad88161a954c738f20eb
Parents: f056f2e
Author: Sergey Chugunov <se...@gmail.com>
Authored: Thu Mar 30 11:40:35 2017 +0300
Committer: Alexey Goncharuk <al...@gmail.com>
Committed: Thu Mar 30 11:40:35 2017 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 5 +-
.../org/apache/ignite/internal/GridTopic.java | 5 +-
.../binary/BinaryCachingMetadataHandler.java | 6 +
.../ignite/internal/binary/BinaryContext.java | 10 +
.../ignite/internal/binary/BinaryMetadata.java | 27 +-
.../internal/binary/BinaryMetadataHandler.java | 18 +-
.../binary/BinaryNoopMetadataHandler.java | 5 +
.../internal/binary/BinaryReaderExImpl.java | 2 +-
.../ignite/internal/binary/BinaryUtils.java | 2 +-
.../communication/GridIoMessageFactory.java | 12 +
.../discovery/DiscoveryCustomMessage.java | 48 ++
.../eventstorage/GridEventStorageManager.java | 6 +-
.../cache/binary/BinaryMetadataHolder.java | 73 +++
.../cache/binary/BinaryMetadataTransport.java | 641 +++++++++++++++++++
.../binary/BinaryMetadataUpdatedListener.java | 29 +
.../binary/CacheObjectBinaryProcessor.java | 9 +
.../binary/CacheObjectBinaryProcessorImpl.java | 474 ++++----------
.../binary/ClientMetadataRequestFuture.java | 161 +++++
.../cache/binary/MetadataRequestMessage.java | 122 ++++
.../cache/binary/MetadataResponseMessage.java | 195 ++++++
.../binary/MetadataUpdateAcceptedMessage.java | 96 +++
.../binary/MetadataUpdateProposedMessage.java | 224 +++++++
.../cache/binary/MetadataUpdateResult.java | 96 +++
.../continuous/CacheContinuousQueryHandler.java | 4 +-
.../GridMarshallerMappingProcessor.java | 7 +-
.../service/GridServiceProcessor.java | 5 +
.../internal/GridTaskExecutionSelfTest.java | 2 +
.../binary/TestCachingMetadataHandler.java | 5 +
.../binary/BinaryMetadataUpdatesFlowTest.java | 592 +++++++++++++++++
...naryObjectMetadataExchangeMultinodeTest.java | 463 ++++++++++++++
.../ignite/testframework/GridTestUtils.java | 57 ++
.../IgniteBinaryObjectsTestSuite.java | 4 +
32 files changed, 3056 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 560d7f6..dc14ae3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -49,7 +49,10 @@ public interface GridComponent {
CLUSTER_PROC,
/** */
- MARSHALLER_PROC
+ MARSHALLER_PROC,
+
+ /** */
+ BINARY_PROC
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
index 3ffc56e..6efc0d0 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridTopic.java
@@ -103,7 +103,10 @@ public enum GridTopic {
TOPIC_MAPPING_MARSH,
/** */
- TOPIC_HADOOP_MSG;
+ TOPIC_HADOOP_MSG,
+
+ /** */
+ TOPIC_METADATA_REQ;
/** Enum values. */
private static final GridTopic[] VALS = values();
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
index 39189f0..24a393d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryCachingMetadataHandler.java
@@ -67,4 +67,10 @@ public class BinaryCachingMetadataHandler implements BinaryMetadataHandler {
@Override public synchronized BinaryType metadata(int typeId) throws BinaryObjectException {
return metas.get(typeId);
}
+
+ /** {@inheritDoc} */
+ @Override public synchronized BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ BinaryTypeImpl type = (BinaryTypeImpl) metas.get(typeId);
+ return type.metadata().hasSchema(schemaId) ? type : null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
index e5b6bda..d918aa3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryContext.java
@@ -1224,6 +1224,16 @@ public class BinaryContext {
/**
* @param typeId Type ID.
+ * @param schemaId Schema ID.
+ * @return Meta data.
+ * @throws BinaryObjectException In case of error.
+ */
+ public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ return metaHnd != null ? metaHnd.metadata(typeId, schemaId): null;
+ }
+
+ /**
+ * @param typeId Type ID.
* @return Affinity key field name.
*/
public String affinityKeyFieldName(int typeId) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
index d1c79f3..393d9ee 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadata.java
@@ -28,7 +28,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
+import java.util.Set;
import org.apache.ignite.internal.util.tostring.GridToStringInclude;
import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
@@ -60,6 +60,9 @@ public class BinaryMetadata implements Externalizable {
/** Schemas associated with type. */
private Collection<BinarySchema> schemas;
+ /** Schema IDs registered for this type */
+ private Set<Integer> schemaIds;
+
/** Whether this is enum type. */
private boolean isEnum;
@@ -89,6 +92,16 @@ public class BinaryMetadata implements Externalizable {
this.fields = fields;
this.affKeyFieldName = affKeyFieldName;
this.schemas = schemas;
+
+ if (schemas != null) {
+ schemaIds = U.newHashSet(schemas.size());
+
+ for (BinarySchema schema : schemas)
+ schemaIds.add(schema.schemaId());
+ }
+ else
+ schemaIds = Collections.emptySet();
+
this.isEnum = isEnum;
}
@@ -145,6 +158,14 @@ public class BinaryMetadata implements Externalizable {
}
/**
+ * @param schemaId Schema ID.
+ * @return {@code true} if <b>BinaryMetadata</b> instance has schema with ID specified, {@code false} otherwise.
+ */
+ public boolean hasSchema(int schemaId) {
+ return schemaIds.contains(schemaId);
+ }
+
+ /**
* @return {@code True} if this is enum type.
*/
public boolean isEnum() {
@@ -249,12 +270,16 @@ public class BinaryMetadata implements Externalizable {
else {
schemas = new ArrayList<>();
+ schemaIds = U.newHashSet(schemasSize);
+
for (int i = 0; i < schemasSize; i++) {
BinarySchema schema = new BinarySchema();
schema.readFrom(in);
schemas.add(schema);
+
+ schemaIds.add(schema.schemaId());
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
index 29ff7b3..748a283 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryMetadataHandler.java
@@ -28,8 +28,8 @@ public interface BinaryMetadataHandler {
* Adds meta data.
*
* @param typeId Type ID.
- * @param meta Meta data.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @param meta Metadata.
+ * @throws BinaryObjectException In case of error.
*/
public void addMeta(int typeId, BinaryType meta) throws BinaryObjectException;
@@ -37,8 +37,18 @@ public interface BinaryMetadataHandler {
* Gets meta data for provided type ID.
*
* @param typeId Type ID.
- * @return Meta data.
- * @throws org.apache.ignite.binary.BinaryObjectException In case of error.
+ * @return Metadata.
+ * @throws BinaryObjectException In case of error.
*/
public BinaryType metadata(int typeId) throws BinaryObjectException;
+
+ /**
+ * Gets metadata for provided type ID and schema ID.
+ *
+ * @param typeId Type ID.
+ * @param schemaId Schema ID.
+ * @return Metadata.
+ * @throws BinaryObjectException In case of error.
+ */
+ public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException;
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
index 9c0c37d..770d9c8 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryNoopMetadataHandler.java
@@ -50,4 +50,9 @@ public class BinaryNoopMetadataHandler implements BinaryMetadataHandler {
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
return null;
}
+
+ /** {@inheritDoc} */
+ @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
index f851b68..d6fefe3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryReaderExImpl.java
@@ -1984,7 +1984,7 @@ public class BinaryReaderExImpl implements BinaryReader, BinaryRawReaderEx, Bina
if (schema == null) {
if (fieldIdLen != BinaryUtils.FIELD_ID_LEN) {
- BinaryTypeImpl type = (BinaryTypeImpl)ctx.metadata(typeId);
+ BinaryTypeImpl type = (BinaryTypeImpl) ctx.metadata(typeId, schemaId);
if (type == null || type.metadata() == null)
throw new BinaryObjectException("Cannot find metadata for object with compact footer: " +
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
index 41ec078..c59b8b7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/binary/BinaryUtils.java
@@ -1562,7 +1562,7 @@ public class BinaryUtils {
Class cls;
if (typeId != GridBinaryMarshaller.UNREGISTERED_TYPE_ID)
- cls = ctx.descriptorForTypeId(true, typeId, ldr, false).describedClass();
+ cls = ctx.descriptorForTypeId(true, typeId, ldr, true).describedClass();
else {
String clsName = doReadClassName(in);
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
index 7bf3de2..737d047 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/communication/GridIoMessageFactory.java
@@ -46,6 +46,8 @@ import org.apache.ignite.internal.processors.cache.GridCacheEvictionRequest;
import org.apache.ignite.internal.processors.cache.GridCacheEvictionResponse;
import org.apache.ignite.internal.processors.cache.GridCacheReturn;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
+import org.apache.ignite.internal.processors.cache.binary.MetadataRequestMessage;
+import org.apache.ignite.internal.processors.cache.binary.MetadataResponseMessage;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTtlUpdateRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryRequest;
import org.apache.ignite.internal.processors.cache.distributed.GridCacheTxRecoveryResponse;
@@ -651,6 +653,16 @@ public class GridIoMessageFactory implements MessageFactory {
break;
+ case 80:
+ msg = new MetadataRequestMessage();
+
+ break;
+
+ case 81:
+ msg = new MetadataResponseMessage();
+
+ break;
+
case 82:
msg = new JobStealingRequest();
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
index d85075e..f908b59 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DiscoveryCustomMessage.java
@@ -19,10 +19,58 @@ package org.apache.ignite.internal.managers.discovery;
import java.io.Serializable;
import org.apache.ignite.lang.IgniteUuid;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddFinishedMessage;
+import org.apache.ignite.spi.discovery.tcp.messages.TcpDiscoveryNodeAddedMessage;
import org.jetbrains.annotations.Nullable;
/**
+ * <b>DiscoveryCustomMessage</b> messages are handled by discovery protocol which provides some guarantees around them.
*
+ * When some node sends <b>DiscoveryCustomMessage</b> with {@link GridDiscoveryManager#sendCustomEvent(DiscoveryCustomMessage)}
+ * call, message firstly goes to current coordinator, is verified there and after that gets sent to the cluster.
+ * Only after verification it is delivered to listeners on all nodes starting from coordinator.
+ *
+ * To register a listener {@link GridDiscoveryManager#setCustomEventListener(Class, CustomEventListener)} method is used.
+ *
+ * Discovery protocol guarantees include:
+ * <ol>
+ * <li>
+ * All discovery messages are observed by all nodes in exactly the same order,
+ * it is guaranteed by handling them in single-threaded mode.
+ * </li>
+ * <li>
+ * New server node joining process in default implementation involves two passes of different messages across the cluster:
+ * {@link TcpDiscoveryNodeAddedMessage} and
+ * {@link TcpDiscoveryNodeAddFinishedMessage} messages.
+ * It is guaranteed that all discovery messages observed by coordinator in between these two messages
+ * are reordered and guaranteed to be delivered to newly joined node.
+ * </li>
+ * </ol>
+ *
+ * Yet there are some features and limitations one should be aware of when using custom discovery messaging mechanism:
+ * <ol>
+ * <li>
+ * Guarantee #2 doesn't encompass <b>DiscoveryCustomMessage</b>s created automatically on
+ * {@link DiscoveryCustomMessage#ackMessage()} method call.
+ *
+ * If there were messages of this type in between <b>TcpDiscoveryNodeAddedMessage</b> and
+ * <b>TcpDiscoveryNodeAddFinishedMessage</b> messages, they won't be delivered to new joiner node.
+ * </li>
+ * <li>
+ * There is no guarantee for a given <b>DiscoveryCustomMessage</b> to be delivered only once.
+ * It is possible that because of node failure antecedent node will resend messages
+ * it thinks were not sent by failed node.
+ * Duplicate messages are not filtered out on receiver side.
+ * </li>
+ * <li>
+ * <b>DiscoveryCustomMessage</b>s are delivered to client nodes in asynchronous fashion
+ * as clients don't participate in the cluster ring.
+ * </li>
+ * <li>
+ * Any blocking operations like obtaining locks or doing I/O <b>must</b> be avoided in message handlers
+ * as they may lead to deadlocks and cluster failures.
+ * </li>
+ * </ol>
*/
public interface DiscoveryCustomMessage extends Serializable {
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
index 9194b10..77bd7d4 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/eventstorage/GridEventStorageManager.java
@@ -42,7 +42,6 @@ import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.GridTopic;
import org.apache.ignite.internal.IgniteDeploymentCheckedException;
import org.apache.ignite.internal.IgniteInternalFuture;
-import org.apache.ignite.internal.events.DiscoveryCustomEvent;
import org.apache.ignite.internal.managers.GridManagerAdapter;
import org.apache.ignite.internal.managers.communication.GridIoManager;
import org.apache.ignite.internal.managers.communication.GridMessageListener;
@@ -71,6 +70,7 @@ import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
import static org.apache.ignite.events.EventType.EVT_NODE_METRICS_UPDATED;
import static org.apache.ignite.internal.GridTopic.TOPIC_EVENT;
+import static org.apache.ignite.internal.events.DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT;
import static org.apache.ignite.internal.managers.communication.GridIoPolicy.PUBLIC_POOL;
/**
@@ -494,7 +494,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @return {@code true} if this is a system hidden event.
*/
private boolean isHiddenEvent(int type) {
- return type == EVT_NODE_METRICS_UPDATED;
+ return type == EVT_NODE_METRICS_UPDATED || type == EVT_DISCOVERY_CUSTOM_EVT;
}
/**
@@ -507,7 +507,7 @@ public class GridEventStorageManager extends GridManagerAdapter<EventStorageSpi>
* @return {@code true} if this is an internal event.
*/
private boolean isInternalEvent(int type) {
- return type == DiscoveryCustomEvent.EVT_DISCOVERY_CUSTOM_EVT || F.contains(EVTS_DISCOVERY_ALL, type);
+ return type == EVT_DISCOVERY_CUSTOM_EVT || F.contains(EVTS_DISCOVERY_ALL, type);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
new file mode 100644
index 0000000..bbc929e
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataHolder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.io.Serializable;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ * Wrapper for {@link BinaryMetadata} which is stored in metadata local cache on each node.
+ * Used internally to track version counters (see javadoc for {@link MetadataUpdateProposedMessage} for more details).
+ */
+final class BinaryMetadataHolder implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final BinaryMetadata metadata;
+
+ /** */
+ private final int pendingVer;
+
+ /** */
+ private final int acceptedVer;
+
+
+ /**
+ * @param metadata Metadata.
+ * @param pendingVer Version of this metadata - how many updates were issued for this type.
+ * @param acceptedVer Pending updates count.
+ */
+ BinaryMetadataHolder(BinaryMetadata metadata, int pendingVer, int acceptedVer) {
+ assert metadata != null;
+
+ this.metadata = metadata;
+ this.pendingVer = pendingVer;
+ this.acceptedVer = acceptedVer;
+ }
+
+ /**
+ *
+ */
+ BinaryMetadata metadata() {
+ return metadata;
+ }
+
+ /**
+ *
+ */
+ int pendingVersion() {
+ return pendingVer;
+ }
+
+ /**
+ *
+ */
+ int acceptedVersion() {
+ return acceptedVer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
new file mode 100644
index 0000000..770fa32
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataTransport.java
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.events.DiscoveryEvent;
+import org.apache.ignite.events.Event;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.binary.BinaryMetadata;
+import org.apache.ignite.internal.binary.BinaryUtils;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridMessageListener;
+import org.apache.ignite.internal.managers.discovery.CustomEventListener;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
+import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+import org.jsr166.ConcurrentHashMap8;
+
+import static org.apache.ignite.events.EventType.EVT_NODE_FAILED;
+import static org.apache.ignite.events.EventType.EVT_NODE_LEFT;
+import static org.apache.ignite.internal.managers.communication.GridIoPolicy.SYSTEM_POOL;
+
+/**
+ * Provides API for discovery-based metadata exchange protocol and communication SPI-based metadata request protocol.
+ *
+ * It is responsible for sending update and metadata requests and manages message listeners for them.
+ *
+ * It also manages synchronization logic (blocking/unblocking threads requesting updates or up-to-date metadata etc)
+ * around protocols.
+ */
+final class BinaryMetadataTransport {
+ /** */
+ private final GridDiscoveryManager discoMgr;
+
+ /** */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final IgniteLogger log;
+
+ /** */
+ private final UUID locNodeId;
+
+ /** */
+ private final boolean clientNode;
+
+ /** */
+ private final ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache;
+
+ /** */
+ private final Queue<MetadataUpdateResultFuture> unlabeledFutures = new ConcurrentLinkedQueue<>();
+
+ /** */
+ private final ConcurrentMap<SyncKey, MetadataUpdateResultFuture> syncMap = new ConcurrentHashMap8<>();
+
+ /** */
+ private final ConcurrentMap<Integer, ClientMetadataRequestFuture> clientReqSyncMap = new ConcurrentHashMap8<>();
+
+ /** */
+ private volatile boolean stopping;
+
+ /** */
+ private final List<BinaryMetadataUpdatedListener> binaryUpdatedLsnrs = new CopyOnWriteArrayList<>();
+
+ /**
+ * @param metaLocCache Metadata locale cache.
+ * @param ctx Context.
+ * @param log Logger.
+ */
+ BinaryMetadataTransport(ConcurrentMap<Integer, BinaryMetadataHolder> metaLocCache, final GridKernalContext ctx, IgniteLogger log) {
+ this.metaLocCache = metaLocCache;
+
+ this.ctx = ctx;
+
+ this.log = log;
+
+ discoMgr = ctx.discovery();
+
+ locNodeId = ctx.localNodeId();
+
+ clientNode = ctx.clientNode();
+
+ discoMgr.setCustomEventListener(MetadataUpdateProposedMessage.class, new MetadataUpdateProposedListener());
+
+ discoMgr.setCustomEventListener(MetadataUpdateAcceptedMessage.class, new MetadataUpdateAcceptedListener());
+
+ GridIoManager ioMgr = ctx.io();
+
+ if (clientNode)
+ ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataResponseListener());
+ else
+ ioMgr.addMessageListener(GridTopic.TOPIC_METADATA_REQ, new MetadataRequestListener(ioMgr));
+
+ if (clientNode)
+ ctx.event().addLocalEventListener(new GridLocalEventListener() {
+ @Override public void onEvent(Event evt) {
+ DiscoveryEvent evt0 = (DiscoveryEvent) evt;
+
+ if (!ctx.isStopping()) {
+ for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
+ fut.onNodeLeft(evt0.eventNode().id());
+ }
+ }
+ }, EVT_NODE_LEFT, EVT_NODE_FAILED);
+ }
+
+ /**
+ * Adds BinaryMetadata updates {@link BinaryMetadataUpdatedListener listener} to transport.
+ *
+ * @param lsnr Listener.
+ */
+ void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
+ binaryUpdatedLsnrs.add(lsnr);
+ }
+
+ /**
+ * Sends request to cluster proposing update for given metadata.
+ *
+ * @param metadata Metadata proposed for update.
+ * @return Future to wait for update result on.
+ */
+ GridFutureAdapter<MetadataUpdateResult> requestMetadataUpdate(BinaryMetadata metadata) throws IgniteCheckedException {
+ MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture();
+
+ synchronized (this) {
+ unlabeledFutures.add(resFut);
+
+ if (!stopping)
+ discoMgr.sendCustomEvent(new MetadataUpdateProposedMessage(metadata, locNodeId));
+ else
+ resFut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+ }
+
+ return resFut;
+ }
+
+ /**
+ * Allows thread to wait for a metadata of given typeId and version to be accepted by the cluster.
+ *
+ * @param typeId ID of binary type.
+ * @param ver version of given binary type (see {@link MetadataUpdateProposedMessage} javadoc for more details).
+ * @return future to wait for update result on.
+ */
+ GridFutureAdapter<MetadataUpdateResult> awaitMetadataUpdate(int typeId, int ver) {
+ SyncKey key = new SyncKey(typeId, ver);
+ MetadataUpdateResultFuture resFut = new MetadataUpdateResultFuture(key);
+
+ MetadataUpdateResultFuture oldFut = syncMap.putIfAbsent(key, resFut);
+
+ if (oldFut != null)
+ resFut = oldFut;
+
+ BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+ if (holder.acceptedVersion() >= ver)
+ resFut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+ return resFut;
+ }
+
+ /**
+ * Allows client node to request latest version of binary metadata for a given typeId from the cluster
+ * in case client is able to detect that it has obsolete metadata in its local cache.
+ *
+ * @param typeId ID of binary type.
+ * @return future to wait for request arrival on.
+ */
+ GridFutureAdapter<MetadataUpdateResult> requestUpToDateMetadata(int typeId) {
+ ClientMetadataRequestFuture newFut = new ClientMetadataRequestFuture(ctx, typeId, clientReqSyncMap);
+
+ ClientMetadataRequestFuture oldFut = clientReqSyncMap.putIfAbsent(typeId, newFut);
+
+ if (oldFut != null)
+ return oldFut;
+
+ newFut.requestMetadata();
+
+ return newFut;
+ }
+
+ /** */
+ void stop() {
+ stopping = true;
+
+ cancelFutures(MetadataUpdateResult.createUpdateDisabledResult());
+ }
+
+ /** */
+ void onDisconnected() {
+ cancelFutures(MetadataUpdateResult.createFailureResult(new BinaryObjectException("Failed to update or wait for metadata, client node disconnected")));
+ }
+
+ /**
+ * @param res result to cancel futures with.
+ */
+ private void cancelFutures(MetadataUpdateResult res) {
+ for (MetadataUpdateResultFuture fut : unlabeledFutures)
+ fut.onDone(res);
+
+ for (MetadataUpdateResultFuture fut : syncMap.values())
+ fut.onDone(res);
+
+ for (ClientMetadataRequestFuture fut : clientReqSyncMap.values())
+ fut.onDone(res);
+ }
+
+ /** */
+ private final class MetadataUpdateProposedListener implements CustomEventListener<MetadataUpdateProposedMessage> {
+
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateProposedMessage msg) {
+ int typeId = msg.typeId();
+
+ BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+ int pendingVer;
+ int acceptedVer;
+
+ if (msg.pendingVersion() == 0) {
+ if (holder != null) {
+ pendingVer = holder.pendingVersion() + 1;
+ acceptedVer = holder.acceptedVersion();
+ }
+ else {
+ pendingVer = 1;
+ acceptedVer = 0;
+ }
+
+ msg.pendingVersion(pendingVer);
+ msg.acceptedVersion(acceptedVer);
+
+ BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
+
+ try {
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+
+ msg.metadata(mergedMeta);
+ }
+ catch (BinaryObjectException err) {
+ log.warning("Exception with merging metadata for typeId: " + typeId, err);
+
+ msg.markRejected(err);
+ }
+ }
+ else {
+ pendingVer = msg.pendingVersion();
+ acceptedVer = msg.acceptedVersion();
+ }
+
+ if (locNodeId.equals(msg.origNodeId())) {
+ MetadataUpdateResultFuture fut = unlabeledFutures.poll();
+
+ if (msg.rejected())
+ fut.onDone(MetadataUpdateResult.createFailureResult(msg.rejectionError()));
+ else {
+ if (clientNode) {
+ BinaryMetadataHolder newHolder = new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer);
+
+ holder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+ if (holder != null) {
+ boolean obsoleteUpd = false;
+
+ do {
+ holder = metaLocCache.get(typeId);
+
+ if (obsoleteUpdate(
+ holder.pendingVersion(),
+ holder.acceptedVersion(),
+ pendingVer,
+ acceptedVer)) {
+ obsoleteUpd = true;
+
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+ break;
+ }
+ }
+ while (!metaLocCache.replace(typeId, holder, newHolder));
+
+ if (!obsoleteUpd)
+ initSyncFor(typeId, pendingVer, fut);
+ }
+ else
+ initSyncFor(typeId, pendingVer, fut);
+ }
+ else {
+ initSyncFor(typeId, pendingVer, fut);
+
+ metaLocCache.put(typeId, new BinaryMetadataHolder(msg.metadata(), pendingVer, acceptedVer));
+ }
+ }
+ }
+ else {
+ if (!msg.rejected()) {
+ BinaryMetadata locMeta = holder != null ? holder.metadata() : null;
+
+ try {
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(locMeta, msg.metadata());
+
+ BinaryMetadataHolder newHolder = new BinaryMetadataHolder(mergedMeta, pendingVer, acceptedVer);
+
+ if (clientNode) {
+ holder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+ if (holder != null) {
+ do {
+ holder = metaLocCache.get(typeId);
+
+ if (obsoleteUpdate(
+ holder.pendingVersion(),
+ holder.acceptedVersion(),
+ pendingVer,
+ acceptedVer))
+ break;
+
+ } while (!metaLocCache.replace(typeId, holder, newHolder));
+ }
+ }
+ else
+ metaLocCache.put(typeId, newHolder);
+ }
+ catch (BinaryObjectException ignored) {
+ assert false : msg;
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * @param typeId Type ID.
+ * @param pendingVer Pending version.
+ * @param fut Future.
+ */
+ private void initSyncFor(int typeId, int pendingVer, MetadataUpdateResultFuture fut) {
+ if (stopping) {
+ fut.onDone(MetadataUpdateResult.createUpdateDisabledResult());
+
+ return;
+ }
+
+ SyncKey key = new SyncKey(typeId, pendingVer);
+
+ syncMap.put(key, fut);
+
+ fut.key(key);
+ }
+
+ /**
+ *
+ */
+ private final class MetadataUpdateAcceptedListener implements CustomEventListener<MetadataUpdateAcceptedMessage> {
+
+ /** {@inheritDoc} */
+ @Override public void onCustomEvent(AffinityTopologyVersion topVer, ClusterNode snd, MetadataUpdateAcceptedMessage msg) {
+ if (msg.duplicated())
+ return;
+
+ int typeId = msg.typeId();
+
+ BinaryMetadataHolder holder = metaLocCache.get(typeId);
+
+ assert holder != null : "No metadata found for typeId " + typeId;
+
+ int newAcceptedVer = msg.acceptedVersion();
+
+ if (clientNode) {
+ BinaryMetadataHolder newHolder = new BinaryMetadataHolder(holder.metadata(),
+ holder.pendingVersion(), newAcceptedVer);
+
+ do {
+ holder = metaLocCache.get(typeId);
+
+ int oldAcceptedVer = holder.acceptedVersion();
+
+ if (oldAcceptedVer > newAcceptedVer)
+ break;
+ }
+ while (!metaLocCache.replace(typeId, holder, newHolder));
+ }
+ else {
+ int oldAcceptedVer = holder.acceptedVersion();
+
+ if (oldAcceptedVer >= newAcceptedVer) {
+ //this is duplicate ack
+ msg.duplicated(true);
+
+ return;
+ }
+
+ metaLocCache.put(typeId, new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), newAcceptedVer));
+ }
+
+ for (BinaryMetadataUpdatedListener lsnr : binaryUpdatedLsnrs)
+ lsnr.binaryMetadataUpdated(holder.metadata());
+
+ GridFutureAdapter<MetadataUpdateResult> fut = syncMap.get(new SyncKey(typeId, newAcceptedVer));
+
+ if (fut != null)
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ }
+ }
+
+ /**
+ * Future class responsible for blocking threads until particular events with metadata updates happen,
+ * e.g. arriving {@link MetadataUpdateAcceptedMessage} acknowledgment or {@link MetadataResponseMessage} response.
+ */
+ private final class MetadataUpdateResultFuture extends GridFutureAdapter<MetadataUpdateResult> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ MetadataUpdateResultFuture() {
+ // No-op.
+ }
+
+ /**
+ * @param key key in syncMap this future was added under.
+ */
+ MetadataUpdateResultFuture(SyncKey key) {
+ this.key = key;
+ }
+
+ /** */
+ private SyncKey key;
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable MetadataUpdateResult res, @Nullable Throwable err) {
+ assert res != null;
+
+ boolean done = super.onDone(res, err);
+
+ if (done && key != null)
+ syncMap.remove(key, this);
+
+ return done;
+ }
+
+ /**
+ * @param key Key.
+ */
+ void key(SyncKey key) {
+ this.key = key;
+ }
+ }
+
+ /**
+ * Key for mapping arriving {@link MetadataUpdateAcceptedMessage} messages
+ * to {@link MetadataUpdateResultFuture}s other threads may be waiting on.
+ */
+ private static final class SyncKey {
+ /** */
+ private final int typeId;
+
+ /** */
+ private final int ver;
+
+ /**
+ * @param typeId Type id.
+ * @param ver Version.
+ */
+ private SyncKey(int typeId, int ver) {
+ this.typeId = typeId;
+ this.ver = ver;
+ }
+
+ /** */
+ int typeId() {
+ return typeId;
+ }
+
+ /** */
+ int version() {
+ return ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return typeId + ver;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object o) {
+ if (o == this)
+ return true;
+
+ if (!(o instanceof SyncKey))
+ return false;
+
+ SyncKey that = (SyncKey) o;
+
+ return (typeId == that.typeId) && (ver == that.ver);
+ }
+ }
+
+ /**
+ * Listener is registered on each server node in cluster waiting for metadata requests from clients.
+ */
+ private final class MetadataRequestListener implements GridMessageListener {
+ /** */
+ private final GridIoManager ioMgr;
+
+ /**
+ * @param ioMgr IO manager.
+ */
+ MetadataRequestListener(GridIoManager ioMgr) {
+ this.ioMgr = ioMgr;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MetadataRequestMessage : msg;
+
+ MetadataRequestMessage msg0 = (MetadataRequestMessage) msg;
+
+ int typeId = msg0.typeId();
+
+ BinaryMetadataHolder metaHolder = metaLocCache.get(typeId);
+
+ MetadataResponseMessage resp = new MetadataResponseMessage(typeId);
+
+ byte[] binMetaBytes = null;
+
+ if (metaHolder != null) {
+ try {
+ binMetaBytes = U.marshal(ctx, metaHolder);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal binary metadata for [typeId: " + typeId + "]", e);
+
+ resp.markErrorOnRequest();
+ }
+ }
+
+ resp.binaryMetadataBytes(binMetaBytes);
+
+ try {
+ ioMgr.sendToGridTopic(nodeId, GridTopic.TOPIC_METADATA_REQ, resp, SYSTEM_POOL);
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to send up-to-date metadata response.", e);
+ }
+ }
+ }
+
+ /**
+ * Listener is registered on each client node and listens for metadata responses from cluster.
+ */
+ private final class MetadataResponseListener implements GridMessageListener {
+
+ /** {@inheritDoc} */
+ @Override public void onMessage(UUID nodeId, Object msg) {
+ assert msg instanceof MetadataResponseMessage : msg;
+
+ MetadataResponseMessage msg0 = (MetadataResponseMessage) msg;
+
+ int typeId = msg0.typeId();
+
+ byte[] binMetaBytes = msg0.binaryMetadataBytes();
+
+ ClientMetadataRequestFuture fut = clientReqSyncMap.get(typeId);
+
+ if (fut == null)
+ return;
+
+ if (msg0.metadataNotFound()) {
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+
+ return;
+ }
+
+ try {
+ BinaryMetadataHolder newHolder = U.unmarshal(ctx, binMetaBytes, U.resolveClassLoader(ctx.config()));
+
+ BinaryMetadataHolder oldHolder = metaLocCache.putIfAbsent(typeId, newHolder);
+
+ if (oldHolder != null) {
+ do {
+ oldHolder = metaLocCache.get(typeId);
+
+ if (oldHolder != null && obsoleteUpdate(
+ oldHolder.pendingVersion(),
+ oldHolder.acceptedVersion(),
+ newHolder.pendingVersion(),
+ newHolder.acceptedVersion()))
+ break;
+ }
+ while (!metaLocCache.replace(typeId, oldHolder, newHolder));
+ }
+
+ fut.onDone(MetadataUpdateResult.createSuccessfulResult());
+ }
+ catch (IgniteCheckedException e) {
+ fut.onDone(MetadataUpdateResult.createFailureResult(new BinaryObjectException(e)));
+ }
+ }
+
+
+ }
+
+ /**
+ * Method checks if arrived metadata is obsolete comparing to the one from local cache.
+ *
+ * @param locP pendingVersion of metadata from local cache.
+ * @param locA acceptedVersion of metadata from local cache.
+ * @param remP pendingVersion of metadata from arrived message (client response/proposed/accepted).
+ * @param remA acceptedVersion of metadata from arrived message (client response/proposed/accepted).
+ * @return {@code true} is
+ */
+ private static boolean obsoleteUpdate(int locP, int locA, int remP, int remA) {
+ return (remP < locP) || (remP == locP && remA < locA);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
new file mode 100644
index 0000000..08e879a
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/BinaryMetadataUpdatedListener.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import org.apache.ignite.internal.binary.BinaryMetadata;
+
+/**
+ * Interface allows any component to register for events of binary metadata updates.
+ */
+public interface BinaryMetadataUpdatedListener {
+ /**
+ * @param metadata Updated metadata.
+ */
+ public void binaryMetadataUpdated(BinaryMetadata metadata);
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
index 7cc7a29..1564ff3 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessor.java
@@ -71,6 +71,15 @@ public interface CacheObjectBinaryProcessor extends IgniteCacheObjectProcessor {
*/
@Nullable public BinaryType metadata(int typeId) throws IgniteException;
+
+ /**
+ * @param typeId Type ID.
+ * @param schemaId Schema ID.
+ * @return Meta data.
+ * @throws IgniteException In case of error.
+ */
+ @Nullable public BinaryType metadata(int typeId, int schemaId) throws IgniteException;
+
/**
* @param typeIds Type ID.
* @return Meta data.
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
index 6e5940f..50a9f44 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/CacheObjectBinaryProcessorImpl.java
@@ -17,25 +17,15 @@
package org.apache.ignite.internal.processors.cache.binary;
-import java.io.Externalizable;
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
+import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CountDownLatch;
-import javax.cache.Cache;
+import java.util.concurrent.ConcurrentMap;
import javax.cache.CacheException;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import javax.cache.event.CacheEntryUpdatedListener;
-import javax.cache.event.EventType;
-import javax.cache.processor.EntryProcessor;
-import javax.cache.processor.MutableEntry;
import org.apache.ignite.IgniteBinary;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -45,9 +35,7 @@ import org.apache.ignite.binary.BinaryObjectBuilder;
import org.apache.ignite.binary.BinaryObjectException;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.binary.BinaryTypeConfiguration;
-import org.apache.ignite.cache.CacheEntryEventSerializableFilter;
import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.BinaryConfiguration;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
@@ -68,45 +56,37 @@ import org.apache.ignite.internal.binary.GridBinaryMarshaller;
import org.apache.ignite.internal.binary.builder.BinaryObjectBuilderImpl;
import org.apache.ignite.internal.binary.streams.BinaryInputStream;
import org.apache.ignite.internal.binary.streams.BinaryOffheapInputStream;
-import org.apache.ignite.internal.cluster.ClusterTopologyCheckedException;
import org.apache.ignite.internal.managers.eventstorage.GridLocalEventListener;
-import org.apache.ignite.internal.processors.affinity.AffinityTopologyVersion;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicate;
-import org.apache.ignite.internal.processors.cache.CacheEntryPredicateAdapter;
import org.apache.ignite.internal.processors.cache.CacheObject;
import org.apache.ignite.internal.processors.cache.CacheObjectContext;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
-import org.apache.ignite.internal.processors.cache.GridCacheEntryEx;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
-import org.apache.ignite.internal.processors.cache.IgniteCacheProxy;
import org.apache.ignite.internal.processors.cache.KeyCacheObject;
-import org.apache.ignite.internal.processors.cache.query.CacheQuery;
-import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cacheobject.IgniteCacheObjectProcessorImpl;
import org.apache.ignite.internal.util.GridUnsafe;
import org.apache.ignite.internal.util.IgniteUtils;
-import org.apache.ignite.internal.util.lang.GridCloseableIterator;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
import org.apache.ignite.internal.util.lang.GridMapEntry;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
-import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T1;
import org.apache.ignite.internal.util.typedef.T2;
-import org.apache.ignite.internal.util.typedef.X;
-import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.typedef.internal.S;
import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgniteClosure;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteProductVersion;
import org.apache.ignite.marshaller.Marshaller;
import org.apache.ignite.spi.IgniteNodeValidationResult;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
import org.jetbrains.annotations.Nullable;
import org.jsr166.ConcurrentHashMap8;
import static org.apache.ignite.IgniteSystemProperties.IGNITE_SKIP_CONFIGURATION_CONSISTENCY_CHECK;
import static org.apache.ignite.IgniteSystemProperties.getBoolean;
import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.BINARY_PROC;
/**
* Binary processor implementation.
@@ -114,25 +94,7 @@ import static org.apache.ignite.events.EventType.EVT_CLIENT_NODE_DISCONNECTED;
public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorImpl implements
CacheObjectBinaryProcessor {
/** */
- private final CountDownLatch startLatch = new CountDownLatch(1);
-
- /** */
- private final boolean clientNode;
-
- /** */
- private volatile IgniteCacheProxy<BinaryMetadataKey, BinaryMetadata> metaDataCache;
-
- /** */
- private final ConcurrentHashMap8<Integer, BinaryTypeImpl> clientMetaDataCache;
-
- /** Predicate to filter binary meta data in utility cache. */
- private final CacheEntryPredicate metaPred = new CacheEntryPredicateAdapter() {
- private static final long serialVersionUID = 0L;
-
- @Override public boolean apply(GridCacheEntryEx e) {
- return e.key().value(e.context().cacheObjectContext(), false) instanceof BinaryMetadataKey;
- }
- };
+ private volatile boolean discoveryStarted;
/** */
private BinaryContext binaryCtx;
@@ -151,11 +113,16 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
private final GridLocalEventListener clientDisconLsnr = new GridLocalEventListener() {
@Override public void onEvent(Event evt) {
binaryContext().unregisterBinarySchemas();
+
+ metadataLocCache.clear();
}
};
- /** Metadata updates collected before metadata cache is initialized. */
- private final Map<Integer, BinaryMetadata> metaBuf = new ConcurrentHashMap<>();
+ /** Locally cached metadata. This local cache is managed by exchanging discovery custom events. */
+ private final ConcurrentMap<Integer, BinaryMetadataHolder> metadataLocCache = new ConcurrentHashMap8<>();
+
+ /** */
+ private BinaryMetadataTransport transport;
/** Cached affinity key field names. */
private final ConcurrentHashMap<Integer, T1<BinaryField>> affKeyFields = new ConcurrentHashMap<>();
@@ -167,10 +134,6 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
super(ctx);
marsh = ctx.grid().configuration().getMarshaller();
-
- clientNode = this.ctx.clientNode();
-
- clientMetaDataCache = clientNode ? new ConcurrentHashMap8<Integer, BinaryTypeImpl>() : null;
}
/** {@inheritDoc} */
@@ -179,47 +142,38 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
if (ctx.clientNode())
ctx.event().addLocalEventListener(clientDisconLsnr, EVT_CLIENT_NODE_DISCONNECTED);
+ transport = new BinaryMetadataTransport(metadataLocCache, ctx, log);
+
BinaryMetadataHandler metaHnd = new BinaryMetadataHandler() {
@Override public void addMeta(int typeId, BinaryType newMeta) throws BinaryObjectException {
assert newMeta != null;
assert newMeta instanceof BinaryTypeImpl;
- BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
+ if (!discoveryStarted) {
+ BinaryMetadataHolder holder = metadataLocCache.get(typeId);
- if (metaDataCache == null) {
- BinaryMetadata oldMeta = metaBuf.get(typeId);
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ BinaryMetadata oldMeta = holder != null ? holder.metadata() : null;
- if (oldMeta != mergedMeta) {
- synchronized (this) {
- mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, ((BinaryTypeImpl)newMeta).metadata());
- if (oldMeta != mergedMeta)
- metaBuf.put(typeId, mergedMeta);
- else
- return;
- }
+ if (oldMeta != mergedMeta)
+ metadataLocCache.putIfAbsent(typeId, new BinaryMetadataHolder(mergedMeta, 0, 0));
- if (metaDataCache == null)
- return;
- else
- metaBuf.remove(typeId);
- }
- else
- return;
+ return;
}
- assert metaDataCache != null;
+ BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
CacheObjectBinaryProcessorImpl.this.addMeta(typeId, newMeta0.wrap(binaryCtx));
}
@Override public BinaryType metadata(int typeId) throws BinaryObjectException {
- if (metaDataCache == null)
- U.awaitQuiet(startLatch);
-
return CacheObjectBinaryProcessorImpl.this.metadata(typeId);
}
+
+ @Override public BinaryType metadata(int typeId, int schemaId) throws BinaryObjectException {
+ return CacheObjectBinaryProcessorImpl.this.metadata(typeId, schemaId);
+ }
};
BinaryMarshaller bMarsh0 = (BinaryMarshaller)marsh;
@@ -265,111 +219,34 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
}
}
+ /**
+ * @param lsnr Listener.
+ */
+ public void addBinaryMetadataUpdateListener(BinaryMetadataUpdatedListener lsnr) {
+ if (transport != null)
+ transport.addBinaryMetadataUpdateListener(lsnr);
+ }
+
/** {@inheritDoc} */
@Override public void stop(boolean cancel) {
if (ctx.clientNode())
ctx.event().removeLocalEventListener(clientDisconLsnr);
- }
- /** {@inheritDoc} */
- @Override public void onContinuousProcessorStarted(GridKernalContext ctx) throws IgniteCheckedException {
- if (clientNode && !ctx.isDaemon()) {
- ctx.continuous().registerStaticRoutine(
- CU.UTILITY_CACHE_NAME,
- new MetaDataEntryListener(),
- new MetaDataEntryFilter(),
- null);
- }
+ if (transport != null)
+ transport.stop();
}
/** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public void onUtilityCacheStarted() throws IgniteCheckedException {
- IgniteCacheProxy<Object, Object> proxy = ctx.cache().jcache(CU.UTILITY_CACHE_NAME);
-
- boolean old = proxy.context().deploy().ignoreOwnership(true);
-
- try {
- metaDataCache = (IgniteCacheProxy)proxy.withNoRetries();
- }
- finally {
- proxy.context().deploy().ignoreOwnership(old);
- }
-
- if (clientNode) {
- assert !metaDataCache.context().affinityNode();
-
- while (true) {
- ClusterNode oldestSrvNode = ctx.discovery().oldestAliveCacheServerNode(AffinityTopologyVersion.NONE);
-
- if (oldestSrvNode == null)
- break;
-
- GridCacheQueryManager qryMgr = metaDataCache.context().queries();
-
- CacheQuery<Map.Entry<BinaryMetadataKey, BinaryMetadata>> qry =
- qryMgr.createScanQuery(new MetaDataPredicate(), null, false);
-
- qry.keepAll(false);
-
- qry.projection(ctx.cluster().get().forNode(oldestSrvNode));
-
- try (GridCloseableIterator<Map.Entry<BinaryMetadataKey, BinaryMetadata>> entries = qry.executeScanQuery()) {
- for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : entries) {
- assert e.getKey() != null : e;
- assert e.getValue() != null : e;
-
- addClientCacheMetaData(e.getKey(), e.getValue());
- }
- }
- catch (IgniteCheckedException e) {
- if (!ctx.discovery().alive(oldestSrvNode) || !ctx.discovery().pingNode(oldestSrvNode.id()))
- continue;
- else
- throw e;
- }
- catch (CacheException e) {
- if (X.hasCause(e, ClusterTopologyCheckedException.class, ClusterTopologyException.class))
- continue;
- else
- throw e;
- }
-
- break;
- }
- }
-
- for (Map.Entry<Integer, BinaryMetadata> e : metaBuf.entrySet())
- addMeta(e.getKey(), e.getValue().wrap(binaryCtx));
-
- metaBuf.clear();
-
- startLatch.countDown();
+ @Override public void onDisconnected(IgniteFuture<?> reconnectFut) throws IgniteCheckedException {
+ if (transport != null)
+ transport.onDisconnected();
}
- /**
- * @param key Metadata key.
- * @param newMeta Metadata.
- */
- private void addClientCacheMetaData(BinaryMetadataKey key, final BinaryMetadata newMeta) {
- int key0 = key.typeId();
-
- clientMetaDataCache.compute(key0, new ConcurrentHashMap8.BiFun<Integer, BinaryTypeImpl, BinaryTypeImpl>() {
- @Override public BinaryTypeImpl apply(Integer key, BinaryTypeImpl oldMeta) {
- BinaryMetadata res;
-
- BinaryMetadata oldMeta0 = oldMeta != null ? oldMeta.metadata() : null;
-
- try {
- res = BinaryUtils.mergeMetadata(oldMeta0, newMeta);
- }
- catch (BinaryObjectException ignored) {
- res = oldMeta0;
- }
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ super.onKernalStart();
- return res != null ? res.wrap(binaryCtx) : null;
- }
- });
+ discoveryStarted = true;
}
/** {@inheritDoc} */
@@ -383,7 +260,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/**
* @param obj Object.
* @return Bytes.
- * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ * @throws BinaryObjectException If failed.
*/
public byte[] marshal(@Nullable Object obj) throws BinaryObjectException {
byte[] arr = binaryMarsh.marshal(obj);
@@ -397,7 +274,7 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
* @param ptr Off-heap pointer.
* @param forceHeap If {@code true} creates heap-based object.
* @return Object.
- * @throws org.apache.ignite.binary.BinaryObjectException If failed.
+ * @throws BinaryObjectException If failed.
*/
public Object unmarshal(long ptr, boolean forceHeap) throws BinaryObjectException {
assert ptr > 0 : ptr;
@@ -536,54 +413,94 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
BinaryMetadata newMeta0 = ((BinaryTypeImpl)newMeta).metadata();
- final BinaryMetadataKey key = new BinaryMetadataKey(typeId);
-
try {
- BinaryMetadata oldMeta = metaDataCache.localPeek(key);
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
+ BinaryMetadataHolder metaHolder = metadataLocCache.get(typeId);
- AffinityTopologyVersion topVer = ctx.cache().context().lockedTopologyVersion(null);
+ BinaryMetadata oldMeta = metaHolder != null ? metaHolder.metadata() : null;
- if (topVer == null)
- topVer = ctx.cache().context().exchange().readyAffinityVersion();
+ BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta0);
- BinaryObjectException err = metaDataCache.invoke(topVer, key, new MetadataProcessor(mergedMeta));
+ MetadataUpdateResult res = transport.requestMetadataUpdate(mergedMeta).get();
- if (err != null)
- throw err;
+ assert res != null;
+
+ if (res.rejected())
+ throw res.error();
}
- catch (CacheException e) {
+ catch (IgniteCheckedException e) {
throw new BinaryObjectException("Failed to update meta data for type: " + newMeta.typeName(), e);
}
}
/** {@inheritDoc} */
- @Nullable @Override public BinaryType metadata(final int typeId) throws BinaryObjectException {
- try {
- if (clientNode) {
- BinaryType typeMeta = clientMetaDataCache.get(typeId);
+ @Nullable @Override public BinaryType metadata(final int typeId) {
+ BinaryMetadataHolder holder = metadataLocCache.get(typeId);
+
+ if (holder == null) {
+ if (ctx.clientNode()) {
+ try {
+ transport.requestUpToDateMetadata(typeId).get();
- if (typeMeta != null)
- return typeMeta;
+ holder = metadataLocCache.get(typeId);
+ }
+ catch (IgniteCheckedException ignored) {
+ // No-op.
+ }
+ }
+ }
- BinaryMetadata meta = metaDataCache.getTopologySafe(new BinaryMetadataKey(typeId));
+ if (holder != null) {
+ if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(typeId, holder.pendingVersion());
- return meta != null ? meta.wrap(binaryCtx) : null;
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignored) {
+ // No-op.
+ }
}
- else {
- BinaryMetadataKey key = new BinaryMetadataKey(typeId);
- BinaryMetadata meta = metaDataCache.localPeek(key);
+ return holder.metadata().wrap(binaryCtx);
+ }
+ else
+ return null;
+ }
- if (meta == null && !metaDataCache.context().preloader().syncFuture().isDone())
- meta = metaDataCache.getTopologySafe(key);
+ /** {@inheritDoc} */
+ @Nullable @Override public BinaryType metadata(final int typeId, final int schemaId) {
+ BinaryMetadataHolder holder = metadataLocCache.get(typeId);
+
+ if (ctx.clientNode()) {
+ if (holder == null || (holder != null && !holder.metadata().hasSchema(schemaId))) {
+ try {
+ transport.requestUpToDateMetadata(typeId).get();
- return meta != null ? meta.wrap(binaryCtx) : null;
+ holder = metadataLocCache.get(typeId);
+ }
+ catch (IgniteCheckedException ignored) {
+ // No-op.
+ }
}
}
- catch (CacheException e) {
- throw new BinaryObjectException(e);
+ else {
+ if (holder.pendingVersion() - holder.acceptedVersion() > 0) {
+ GridFutureAdapter<MetadataUpdateResult> fut = transport.awaitMetadataUpdate(
+ typeId,
+ holder.pendingVersion());
+
+ try {
+ fut.get();
+ }
+ catch (IgniteCheckedException ignored) {
+ // No-op.
+ }
+
+ holder = metadataLocCache.get(typeId);
+ }
}
+
+ return holder != null ? holder.metadata().wrap(binaryCtx) : null;
}
/** {@inheritDoc} */
@@ -595,12 +512,10 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
for (Integer typeId : typeIds)
keys.add(new BinaryMetadataKey(typeId));
- Map<BinaryMetadataKey, BinaryMetadata> meta = metaDataCache.getAll(keys);
-
- Map<Integer, BinaryType> res = U.newHashMap(meta.size());
+ Map<Integer, BinaryType> res = U.newHashMap(metadataLocCache.size());
- for (Map.Entry<BinaryMetadataKey, BinaryMetadata> e : meta.entrySet())
- res.put(e.getKey().typeId(), e.getValue().wrap(binaryCtx));
+ for (Map.Entry<Integer, BinaryMetadataHolder> e : metadataLocCache.entrySet())
+ res.put(e.getKey(), e.getValue().metadata().wrap(binaryCtx));
return res;
}
@@ -612,22 +527,11 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
/** {@inheritDoc} */
@SuppressWarnings("unchecked")
@Override public Collection<BinaryType> metadata() throws BinaryObjectException {
- if (clientNode)
- return F.viewReadOnly(clientMetaDataCache.values(), new IgniteClosure<BinaryTypeImpl, BinaryType>() {
- @Override public BinaryType apply(BinaryTypeImpl meta) {
- return meta;
- }
- });
- else {
- return F.viewReadOnly(metaDataCache.entrySetx(metaPred),
- new C1<Cache.Entry<BinaryMetadataKey, BinaryMetadata>, BinaryType>() {
- private static final long serialVersionUID = 0L;
-
- @Override public BinaryType apply(Cache.Entry<BinaryMetadataKey, BinaryMetadata> e) {
- return e.getValue().wrap(binaryCtx);
- }
- });
- }
+ return F.viewReadOnly(metadataLocCache.values(), new IgniteClosure<BinaryMetadataHolder, BinaryType>() {
+ @Override public BinaryType apply(BinaryMetadataHolder metaHolder) {
+ return metaHolder.metadata().wrap(binaryCtx);
+ }
+ });
}
/** {@inheritDoc} */
@@ -907,127 +811,33 @@ public class CacheObjectBinaryProcessorImpl extends IgniteCacheObjectProcessorIm
return null;
}
- /**
- * Processor responsible for metadata update.
- */
- private static class MetadataProcessor
- implements EntryProcessor<BinaryMetadataKey, BinaryMetadata, BinaryObjectException>, Externalizable {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private BinaryMetadata newMeta;
-
- /**
- * For {@link Externalizable}.
- */
- public MetadataProcessor() {
- // No-op.
- }
-
- /**
- * @param newMeta New metadata.
- */
- private MetadataProcessor(BinaryMetadata newMeta) {
- assert newMeta != null;
-
- this.newMeta = newMeta;
- }
-
- /** {@inheritDoc} */
- @Override public BinaryObjectException process(MutableEntry<BinaryMetadataKey, BinaryMetadata> entry,
- Object... args) {
- try {
- BinaryMetadata oldMeta = entry.getValue();
-
- BinaryMetadata mergedMeta = BinaryUtils.mergeMetadata(oldMeta, newMeta);
-
- if (mergedMeta != oldMeta)
- entry.setValue(mergedMeta);
-
- return null;
- }
- catch (BinaryObjectException e) {
- return e;
- }
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(newMeta);
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- newMeta = (BinaryMetadata)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MetadataProcessor.class, this);
- }
- }
-
- /**
- *
- */
- class MetaDataEntryListener implements CacheEntryUpdatedListener<BinaryMetadataKey, BinaryMetadata> {
- /** {@inheritDoc} */
- @Override public void onUpdated(
- Iterable<CacheEntryEvent<? extends BinaryMetadataKey, ? extends BinaryMetadata>> evts)
- throws CacheEntryListenerException {
- for (CacheEntryEvent<? extends BinaryMetadataKey, ? extends BinaryMetadata> evt : evts) {
- assert evt.getEventType() == EventType.CREATED || evt.getEventType() == EventType.UPDATED : evt;
-
- BinaryMetadataKey key = evt.getKey();
-
- final BinaryMetadata newMeta = evt.getValue();
-
- assert newMeta != null : evt;
-
- addClientCacheMetaData(key, newMeta);
- }
- }
-
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MetaDataEntryListener.class, this);
- }
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return BINARY_PROC;
}
- /**
- *
- */
- static class MetaDataEntryFilter implements CacheEntryEventSerializableFilter<Object, Object> {
- /** */
- private static final long serialVersionUID = 0L;
+ /** {@inheritDoc} */
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ if (!dataBag.commonDataCollectedFor(BINARY_PROC.ordinal())) {
+ Map<Integer, BinaryMetadataHolder> res = U.newHashMap(metadataLocCache.size());
- /** {@inheritDoc} */
- @Override public boolean evaluate(CacheEntryEvent<?, ?> evt) throws CacheEntryListenerException {
- return evt.getKey() instanceof BinaryMetadataKey;
- }
+ for (Map.Entry<Integer,BinaryMetadataHolder> e : metadataLocCache.entrySet())
+ res.put(e.getKey(), e.getValue());
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MetaDataEntryFilter.class, this);
+ dataBag.addGridCommonData(BINARY_PROC.ordinal(), (Serializable) res);
}
}
- /**
- *
- */
- static class MetaDataPredicate implements IgniteBiPredicate<Object, Object> {
- /** */
- private static final long serialVersionUID = 0L;
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ Map<Integer, BinaryMetadataHolder> receivedData = (Map<Integer, BinaryMetadataHolder>) data.commonData();
- /** {@inheritDoc} */
- @Override public boolean apply(Object key, Object val) {
- return key instanceof BinaryMetadataKey;
- }
+ if (receivedData != null) {
+ for (Map.Entry<Integer, BinaryMetadataHolder> e : receivedData.entrySet()) {
+ BinaryMetadataHolder holder = e.getValue();
- /** {@inheritDoc} */
- @Override public String toString() {
- return S.toString(MetaDataPredicate.class, this);
+ metadataLocCache.put(e.getKey(), new BinaryMetadataHolder(holder.metadata(), holder.pendingVersion(), holder.pendingVersion()));
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/44cf1d21/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
new file mode 100644
index 0000000..658e9da
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/cache/binary/ClientMetadataRequestFuture.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.cache.binary;
+
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.binary.BinaryObjectException;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.GridTopic;
+import org.apache.ignite.internal.managers.communication.GridIoManager;
+import org.apache.ignite.internal.managers.communication.GridIoPolicy;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Future is responsible for requesting up-to-date metadata from any server node in cluster
+ * and for blocking thread on client node until response arrives.
+ *
+ * It can cope with situation if node currently requested for the metadata leaves cluster;
+ * in that case future simply re-requests metadata from the next node available in topology.
+ */
+final class ClientMetadataRequestFuture extends GridFutureAdapter<MetadataUpdateResult> {
+ /** */
+ private static final AtomicReference<IgniteLogger> logRef = new AtomicReference<>();
+
+ /** */
+ private static IgniteLogger log;
+
+ /** */
+ private final GridIoManager ioMgr;
+
+ /** */
+ private final GridDiscoveryManager discoMgr;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private final Map<Integer, ClientMetadataRequestFuture> syncMap;
+
+ /** */
+ private final Queue<ClusterNode> aliveSrvNodes;
+
+ /** */
+ private ClusterNode pendingNode;
+
+ /**
+ * @param ctx Context.
+ * @param syncMap Map to store futures for ongoing requests.
+ */
+ ClientMetadataRequestFuture(
+ GridKernalContext ctx,
+ int typeId,
+ Map<Integer, ClientMetadataRequestFuture> syncMap
+ ) {
+ ioMgr = ctx.io();
+ discoMgr = ctx.discovery();
+ aliveSrvNodes = new LinkedList<>(discoMgr.aliveServerNodes());
+
+ this.typeId = typeId;
+ this.syncMap = syncMap;
+
+ if (log == null)
+ log = U.logger(ctx, logRef, ClientMetadataRequestFuture.class);
+ }
+
+ /** */
+ void requestMetadata() {
+ boolean noSrvsInCluster;
+
+ synchronized (this) {
+ while (!aliveSrvNodes.isEmpty()) {
+ ClusterNode srvNode = aliveSrvNodes.poll();
+
+ try {
+ ioMgr.sendToGridTopic(srvNode,
+ GridTopic.TOPIC_METADATA_REQ,
+ new MetadataRequestMessage(typeId),
+ GridIoPolicy.SYSTEM_POOL);
+
+ if (discoMgr.node(srvNode.id()) == null)
+ continue;
+
+ pendingNode = srvNode;
+
+ break;
+ }
+ catch (IgniteCheckedException ignored) {
+ U.warn(log,
+ "Failed to request marshaller mapping from remote node (proceeding with the next one): "
+ + srvNode);
+ }
+ }
+
+ noSrvsInCluster = pendingNode == null;
+ }
+
+ if (noSrvsInCluster)
+ onDone(MetadataUpdateResult.createFailureResult(
+ new BinaryObjectException(
+ "All server nodes have left grid, cannot request metadata [typeId: "
+ + typeId + "]")));
+ }
+
+ /**
+ * If left node is the one latest metadata request was sent to,
+ * request is sent again to the next node in topology.
+ *
+ * @param leftNodeId ID of left node.
+ */
+ void onNodeLeft(UUID leftNodeId) {
+ boolean reqAgain = false;
+
+ synchronized (this) {
+ if (pendingNode != null && pendingNode.id().equals(leftNodeId)) {
+ aliveSrvNodes.remove(pendingNode);
+
+ pendingNode = null;
+
+ reqAgain = true;
+ }
+ }
+
+ if (reqAgain)
+ requestMetadata();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable MetadataUpdateResult res, @Nullable Throwable err) {
+ assert res != null;
+
+ boolean done = super.onDone(res, err);
+
+ if (done)
+ syncMap.remove(typeId);
+
+ return done;
+ }
+}