You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by ag...@apache.org on 2017/01/19 12:05:43 UTC
[3/5] ignite git commit: IGNITE-4157 Use discovery custom messages
instead of marshaller cache - Fixes #1271.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
new file mode 100644
index 0000000..1cff458
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingItem.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.io.Serializable;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Used to exchange mapping information on new mapping added or missing mapping requested flows.
+ * See {@link GridMarshallerMappingProcessor} javadoc for more information.
+ */
+public final class MarshallerMappingItem implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final byte platformId;
+
+ /** */
+ private final int typeId;
+
+ /** */
+ private String clsName;
+
+ /**
+ * Class name may be null when instance is created to request missing mapping from cluster.
+ *
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ * @param clsName Class name. May be null in case when the item is created to request missing mapping from grid.
+ */
+ public MarshallerMappingItem(byte platformId, int typeId, @Nullable String clsName) {
+ this.platformId = platformId;
+ this.typeId = typeId;
+ this.clsName = clsName;
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** */
+ public byte platformId() {
+ return platformId;
+ }
+
+ /** */
+ public String className() {
+ return clsName;
+ }
+
+ /**
+ * @param clsName Class name.
+ */
+ public void className(String clsName) {
+ this.clsName = clsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ if (obj == this)
+ return true;
+
+ if (!(obj instanceof MarshallerMappingItem))
+ return false;
+
+ MarshallerMappingItem that = (MarshallerMappingItem) obj;
+
+ return platformId == that.platformId
+ && typeId == that.typeId
+ && (clsName != null ? clsName.equals(that.clsName) : that.clsName == null);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return 31 * 31 * ((int) platformId) + 31 * typeId + (clsName != null ? clsName.hashCode() : 0) ;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return "[platformId: " + platformId + ", typeId:" + typeId + ", clsName: " + clsName + "]";
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
new file mode 100644
index 0000000..b974882
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MarshallerMappingTransport.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.util.concurrent.ConcurrentMap;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.MarshallerContextImpl;
+import org.apache.ignite.internal.managers.discovery.DiscoveryCustomMessage;
+import org.apache.ignite.internal.managers.discovery.GridDiscoveryManager;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides capabilities of sending custom discovery events to propose new mapping
+ * or request missing mapping to {@link MarshallerContextImpl}.
+ *
+ * For more information about particular events see documentation of {@link GridMarshallerMappingProcessor}.
+ */
+public final class MarshallerMappingTransport {
+ /** */
+ private final GridKernalContext ctx;
+
+ /** */
+ private final GridDiscoveryManager discoMgr;
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap;
+
+ /** */
+ private final ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap;
+
+ /** */
+ private volatile boolean stopping;
+
+ /**
+ * @param ctx Context.
+ * @param mappingExchSyncMap Mapping exch sync map.
+ * @param clientReqSyncMap Client request sync map.
+ */
+ MarshallerMappingTransport(
+ GridKernalContext ctx,
+ ConcurrentMap<MarshallerMappingItem, GridFutureAdapter<MappingExchangeResult>> mappingExchSyncMap,
+ ConcurrentMap<MarshallerMappingItem, ClientRequestFuture> clientReqSyncMap
+ ) {
+ this.ctx = ctx;
+ discoMgr = ctx.discovery();
+ this.mappingExchSyncMap = mappingExchSyncMap;
+ this.clientReqSyncMap = clientReqSyncMap;
+
+ stopping = false;
+ }
+
+ /**
+ * @param item Item.
+ * @param cache Cache.
+ */
+ public GridFutureAdapter<MappingExchangeResult> awaitMappingAcceptance(
+ MarshallerMappingItem item, ConcurrentMap<Integer,
+ MappedName> cache
+ ) {
+ GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
+
+ GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
+
+ if (oldFut != null)
+ return oldFut;
+
+ MappedName mappedName = cache.get(item.typeId());
+
+ assert mappedName != null;
+
+ //double check whether mapping is accepted, first check was in MarshallerContextImpl::registerClassName
+ if (mappedName.accepted())
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
+
+ return fut;
+ }
+
+ /**
+ * @param item Item.
+ * @param cache Cache.
+ */
+ public GridFutureAdapter<MappingExchangeResult> proposeMapping(MarshallerMappingItem item, ConcurrentMap<Integer, MappedName> cache) throws IgniteCheckedException {
+ GridFutureAdapter<MappingExchangeResult> fut = new MappingExchangeResultFuture(item);
+
+ GridFutureAdapter<MappingExchangeResult> oldFut = mappingExchSyncMap.putIfAbsent(item, fut);
+
+ if (oldFut != null)
+ return oldFut;
+ else {
+ //double check, first check was in caller: MarshallerContextImpl::registerClassName
+ MappedName mapping = cache.get(item.typeId());
+
+ if (mapping != null) {
+ String mappedClsName = mapping.className();
+
+ if (!mappedClsName.equals(item.className()))
+ fut.onDone(MappingExchangeResult.createFailureResult(duplicateMappingException(item, mappedClsName)));
+ else if (mapping.accepted())
+ fut.onDone(MappingExchangeResult.createSuccessfulResult(mappedClsName));
+ else if (stopping)
+ fut.onDone(MappingExchangeResult.createExchangeDisabledResult());
+
+ return fut;
+ }
+ }
+
+ DiscoveryCustomMessage msg = new MappingProposedMessage(item, discoMgr.localNode().id());
+ discoMgr.sendCustomEvent(msg);
+
+ return fut;
+ }
+
+ /**
+ * @param item Item.
+ * @param cache Cache.
+ */
+ public GridFutureAdapter<MappingExchangeResult> requestMapping(
+ MarshallerMappingItem item,
+ ConcurrentMap<Integer, MappedName> cache
+ ) {
+ ClientRequestFuture newFut = new ClientRequestFuture(ctx, item, clientReqSyncMap);
+
+ ClientRequestFuture oldFut = clientReqSyncMap.putIfAbsent(item, newFut);
+
+ if (oldFut != null)
+ return oldFut;
+
+ MappedName mappedName = cache.get(item.typeId());
+
+ if (mappedName != null) {
+ newFut.onDone(MappingExchangeResult.createSuccessfulResult(mappedName.className()));
+
+ return newFut;
+ }
+
+ newFut.requestMapping();
+
+ return newFut;
+ }
+
+ /**
+ * @param item Item.
+ * @param mappedClsName Mapped class name.
+ */
+ private IgniteCheckedException duplicateMappingException(MarshallerMappingItem item, String mappedClsName) {
+ return new IgniteCheckedException("Duplicate ID [platformId="
+ + item.platformId()
+ + ", typeId="
+ + item.typeId()
+ + ", oldCls="
+ + mappedClsName
+ + ", newCls="
+ + item.className() + "]");
+ }
+
+ /** */
+ public void markStopping() {
+ stopping = true;
+ }
+
+ /** */
+ public boolean stopping() {
+ return stopping;
+ }
+
+ /**
+ * Future to wait for mapping exchange result to arrive. Removes itself from map when completed.
+ */
+ private class MappingExchangeResultFuture extends GridFutureAdapter<MappingExchangeResult> {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final MarshallerMappingItem mappingItem;
+
+ /**
+ * @param mappingItem Mapping item.
+ */
+ private MappingExchangeResultFuture(MarshallerMappingItem mappingItem) {
+ this.mappingItem = mappingItem;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean onDone(@Nullable MappingExchangeResult res, @Nullable Throwable err) {
+ assert res != null;
+
+ boolean done = super.onDone(res, null);
+
+ if (done)
+ mappingExchSyncMap.remove(mappingItem, this);
+
+ return done;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
new file mode 100644
index 0000000..ba2afce
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingRequestMessage.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * Client node receives discovery messages in asynchronous mode
+ * so it is possible that all server nodes already accepted new mapping but clients are unaware about it.
+ *
+ * In this case it is possible for client node to receive a request to perform some operation on such class
+ * client doesn't know about its mapping.
+ * Upon receiving such request client sends an explicit {@link MissingMappingRequestMessage} mapping request
+ * to one of server nodes using CommunicationSPI and waits for {@link MissingMappingResponseMessage} response.
+ *
+ * If server node where mapping request was sent to leaves the cluster for some reason
+ * mapping request gets automatically resent to the next alive server node in topology.
+ */
+public class MissingMappingRequestMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private byte platformId;
+
+ /** */
+ private int typeId;
+
+ /**
+ * Default constructor.
+ */
+ public MissingMappingRequestMessage() {
+ //No-op.
+ }
+
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ */
+ MissingMappingRequestMessage(byte platformId, int typeId) {
+ this.platformId = platformId;
+ this.typeId = typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("platformId", platformId))
+ return false;
+
+ writer.incrementState();
+
+ case 1:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ platformId = reader.readByte("platformId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+ }
+
+ return reader.afterMessageRead(MissingMappingRequestMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 120;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 2;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /** */
+ public byte platformId() {
+ return platformId;
+ }
+
+ /** */
+ public int typeId() {
+ return typeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MissingMappingRequestMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
new file mode 100644
index 0000000..d468302
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/marshaller/MissingMappingResponseMessage.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.internal.processors.marshaller;
+
+import java.nio.ByteBuffer;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.plugin.extensions.communication.Message;
+import org.apache.ignite.plugin.extensions.communication.MessageReader;
+import org.apache.ignite.plugin.extensions.communication.MessageWriter;
+
+/**
+ * On receiving a {@link MissingMappingRequestMessage} mapping request server node looks up class name
+ * for requested platformId and typeId in its local marshaller cache and sends back
+ * a {@link MissingMappingResponseMessage} mapping response with resolved class name.
+ */
+public class MissingMappingResponseMessage implements Message {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private byte platformId;
+
+ /** */
+ private int typeId;
+
+ /** */
+ private String clsName;
+
+ /**
+ * Default constructor.
+ */
+ public MissingMappingResponseMessage() {
+ }
+
+ /**
+ * @param platformId Platform id.
+ * @param typeId Type id.
+ * @param clsName Class name.
+ */
+ MissingMappingResponseMessage(byte platformId, int typeId, String clsName) {
+ this.platformId = platformId;
+ this.typeId = typeId;
+ this.clsName = clsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean writeTo(ByteBuffer buf, MessageWriter writer) {
+ writer.setBuffer(buf);
+
+ if (!writer.isHeaderWritten()) {
+ if (!writer.writeHeader(directType(), fieldsCount()))
+ return false;
+
+ writer.onHeaderWritten();
+ }
+
+ switch (writer.state()) {
+ case 0:
+ if (!writer.writeByte("platformId", platformId))
+ return false;
+
+ writer.incrementState();
+ case 1:
+ if (!writer.writeInt("typeId", typeId))
+ return false;
+
+ writer.incrementState();
+
+ case 2:
+ if (!writer.writeString("clsName", clsName))
+ return false;
+
+ writer.incrementState();
+ }
+
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean readFrom(ByteBuffer buf, MessageReader reader) {
+ reader.setBuffer(buf);
+
+ if (!reader.beforeMessageRead())
+ return false;
+
+ switch (reader.state()) {
+ case 0:
+ platformId = reader.readByte("platformId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 1:
+ typeId = reader.readInt("typeId");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+
+ case 2:
+ clsName = reader.readString("clsName");
+
+ if (!reader.isLastRead())
+ return false;
+
+ reader.incrementState();
+ }
+
+ return reader.afterMessageRead(MissingMappingResponseMessage.class);
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte directType() {
+ return 121;
+ }
+
+ /** {@inheritDoc} */
+ @Override public byte fieldsCount() {
+ return 3;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onAckReceived() {
+ // No-op.
+ }
+
+ /**
+ *
+ */
+ public byte platformId() {
+ return platformId;
+ }
+
+ /**
+ *
+ */
+ public int typeId() {
+ return typeId;
+ }
+
+ /**
+ *
+ */
+ public String className() {
+ return clsName;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(MissingMappingResponseMessage.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
index 526e222..1a45f7d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/platform/utils/PlatformUtils.java
@@ -17,6 +17,22 @@
package org.apache.ignite.internal.processors.platform.utils;
+import java.lang.reflect.Field;
+import java.math.BigDecimal;
+import java.security.Timestamp;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.cache.CacheException;
+import javax.cache.event.CacheEntryEvent;
+import javax.cache.event.CacheEntryListenerException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
@@ -42,8 +58,8 @@ import org.apache.ignite.internal.processors.platform.memory.PlatformInputStream
import org.apache.ignite.internal.processors.platform.memory.PlatformMemory;
import org.apache.ignite.internal.processors.platform.memory.PlatformMemoryUtils;
import org.apache.ignite.internal.processors.platform.memory.PlatformOutputStream;
-import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.X;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
@@ -51,23 +67,6 @@ import org.apache.ignite.lang.IgniteUuid;
import org.apache.ignite.logger.NullLogger;
import org.jetbrains.annotations.Nullable;
-import javax.cache.CacheException;
-import javax.cache.event.CacheEntryEvent;
-import javax.cache.event.CacheEntryListenerException;
-import java.math.BigDecimal;
-import java.security.Timestamp;
-import java.lang.reflect.Field;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-
import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_PREFIX;
/**
@@ -844,25 +843,16 @@ public class PlatformUtils {
*/
@SuppressWarnings("deprecation")
public static GridBinaryMarshaller marshaller() {
- try {
- IgniteConfiguration cfg = new IgniteConfiguration();
-
- BinaryContext ctx =
- new BinaryContext(BinaryNoopMetadataHandler.instance(), cfg, new NullLogger());
-
- BinaryMarshaller marsh = new BinaryMarshaller();
+ BinaryContext ctx =
+ new BinaryContext(BinaryNoopMetadataHandler.instance(), new IgniteConfiguration(), new NullLogger());
- String workDir = U.workDirectory(cfg.getWorkDirectory(), cfg.getIgniteHome());
+ BinaryMarshaller marsh = new BinaryMarshaller();
- marsh.setContext(new MarshallerContextImpl(workDir, null));
+ marsh.setContext(new MarshallerContextImpl(null));
- ctx.configure(marsh, new IgniteConfiguration());
+ ctx.configure(marsh, new IgniteConfiguration());
- return new GridBinaryMarshaller(ctx);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
+ return new GridBinaryMarshaller(ctx);
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
index 560d474..ffc1ded 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/plugin/IgnitePluginProcessor.java
@@ -39,8 +39,13 @@ import org.apache.ignite.plugin.Extension;
import org.apache.ignite.plugin.ExtensionRegistry;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.PluginProvider;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.GridDiscoveryData;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag.JoiningNodeDiscoveryData;
import org.jetbrains.annotations.Nullable;
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.PLUGIN;
+
/**
*
*/
@@ -148,44 +153,87 @@ public class IgnitePluginProcessor extends GridProcessorAdapter {
/** {@inheritDoc} */
@Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.PLUGIN;
+ return PLUGIN;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void collectJoiningNodeData(DiscoveryDataBag dataBag) {
+ Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+
+ if (pluginsData != null)
+ dataBag.addJoiningNodeData(PLUGIN.ordinal(), pluginsData);
}
/** {@inheritDoc} */
- @Nullable @Override public Serializable collectDiscoveryData(UUID nodeId) {
- HashMap<String, Serializable> discData = null;
+ @Override public void collectGridNodeData(DiscoveryDataBag dataBag) {
+ Serializable pluginsData = getDiscoveryData(dataBag.joiningNodeId());
+
+ if (pluginsData != null)
+ dataBag.addNodeSpecificData(PLUGIN.ordinal(), pluginsData);
+ }
+
+ /**
+ * @param joiningNodeId Joining node id.
+ */
+ private Serializable getDiscoveryData(UUID joiningNodeId) {
+ HashMap<String, Serializable> pluginsData = null;
for (Map.Entry<String, PluginProvider> e : plugins.entrySet()) {
- Serializable data = e.getValue().provideDiscoveryData(nodeId);
+ Serializable data = e.getValue().provideDiscoveryData(joiningNodeId);
if (data != null) {
- if (discData == null)
- discData = new HashMap<>();
+ if (pluginsData == null)
+ pluginsData = new HashMap<>();
- discData.put(e.getKey(), data);
+ pluginsData.put(e.getKey(), data);
}
}
- return discData;
+ return pluginsData;
}
/** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID nodeId, UUID rmtNodeId, Serializable data) {
- Map<String, Serializable> discData = (Map<String, Serializable>)data;
+ @Override public void onJoiningNodeDataReceived(JoiningNodeDiscoveryData data) {
+ if (data.hasJoiningNodeData()) {
+ Map<String, Serializable> pluginsData = (Map<String, Serializable>) data.joiningNodeData();
- if (discData != null) {
- for (Map.Entry<String, Serializable> e : discData.entrySet()) {
- PluginProvider provider = plugins.get(e.getKey());
+ applyPluginsData(data.joiningNodeId(), pluginsData);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onGridDataReceived(GridDiscoveryData data) {
+ Map<UUID, Serializable> nodeSpecificData = data.nodeSpecificData();
+
+ if (nodeSpecificData != null) {
+ UUID joiningNodeId = data.joiningNodeId();
+
+ for (Serializable v : nodeSpecificData.values()) {
+ if (v != null) {
+ Map<String, Serializable> pluginsData = (Map<String, Serializable>) v;
- if (provider != null)
- provider.receiveDiscoveryData(nodeId, e.getValue());
- else
- U.warn(log, "Received discovery data for unknown plugin: " + e.getKey());
+ applyPluginsData(joiningNodeId, pluginsData);
+ }
}
}
}
/**
+ * @param nodeId Node id.
+ * @param pluginsData Plugins data.
+ */
+ private void applyPluginsData(UUID nodeId, Map<String, Serializable> pluginsData) {
+ for (Map.Entry<String, Serializable> e : pluginsData.entrySet()) {
+ PluginProvider provider = plugins.get(e.getKey());
+
+ if (provider != null)
+ provider.receiveDiscoveryData(nodeId, e.getValue());
+ else
+ U.warn(log, "Received discovery data for unknown plugin: " + e.getKey());
+ }
+ }
+
+ /**
* Print plugins information.
*/
private void ackPluginsInfo() {
@@ -227,7 +275,7 @@ public class IgnitePluginProcessor extends GridProcessorAdapter {
/**
* @return Map extension interface to array of implementation.
*/
- public Map<Class<?>, Object[]> createExtensionMap() {
+ Map<Class<?>, Object[]> createExtensionMap() {
Map<Class<?>, Object[]> extensions = new HashMap<>(extensionsCollector.size() * 2, 0.5f);
for (Map.Entry<Class<?>, List<Object>> entry : extensionsCollector.entrySet()) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
index 89140b9..f42815b 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/pool/PoolProcessor.java
@@ -118,11 +118,6 @@ public class PoolProcessor extends GridProcessorAdapter {
return ctx.utilityCachePool();
- case GridIoPolicy.MARSH_CACHE_POOL:
- assert ctx.marshallerCachePool() != null : "Marshaller cache pool is not configured.";
-
- return ctx.marshallerCachePool();
-
case GridIoPolicy.IGFS_POOL:
assert ctx.getIgfsExecutorService() != null : "IGFS pool is not configured.";
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
index d26242d..986fff7 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/service/GridServiceProcessor.java
@@ -69,6 +69,8 @@ import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.query.CacheQuery;
import org.apache.ignite.internal.processors.cache.query.GridCacheQueryManager;
import org.apache.ignite.internal.processors.cache.transactions.IgniteInternalTx;
+import org.apache.ignite.internal.processors.marshaller.MappingAcceptedMessage;
+import org.apache.ignite.internal.processors.marshaller.MappingProposedMessage;
import org.apache.ignite.internal.processors.task.GridInternal;
import org.apache.ignite.internal.processors.timeout.GridTimeoutObject;
import org.apache.ignite.internal.util.GridEmptyIterator;
@@ -1580,12 +1582,15 @@ public class GridServiceProcessor extends GridProcessorAdapter {
if (evt instanceof DiscoveryCustomEvent) {
DiscoveryCustomMessage msg = ((DiscoveryCustomEvent)evt).customMessage();
- topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
-
if (msg instanceof CacheAffinityChangeMessage) {
if (!((CacheAffinityChangeMessage)msg).exchangeNeeded())
return;
}
+
+ if (msg instanceof MappingProposedMessage || msg instanceof MappingAcceptedMessage)
+ return;
+
+ topVer = ((DiscoveryCustomEvent)evt).affinityTopologyVersion();
}
else
topVer = new AffinityTopologyVersion(((DiscoveryEvent)evt).topologyVersion(), 0);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
index 252ddd6..6898c36 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/MarshallerContext.java
@@ -26,23 +26,36 @@ public interface MarshallerContext {
/**
* Registers class with provided type ID.
*
- * @param id Type ID.
- * @param cls Class.
+ * @param typeId Type ID.
+ * @param platformId id of a platform to register class.
+ * @param clsName class name.
* @return Whether class was registered.
* @throws IgniteCheckedException In case of error.
*/
- public boolean registerClass(int id, Class cls) throws IgniteCheckedException;
+ public boolean registerClassName(byte platformId, int typeId, String clsName) throws IgniteCheckedException;
/**
* Gets class for provided type ID.
*
- * @param id Type ID.
+ * @param typeId Type ID.
* @param ldr Class loader.
* @return Class.
* @throws ClassNotFoundException If class was not found.
* @throws IgniteCheckedException In case of any other error.
*/
- public Class getClass(int id, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException;
+ public Class getClass(int typeId, ClassLoader ldr) throws ClassNotFoundException, IgniteCheckedException;
+
+
+ /**
+ * Gets class name for provided (platformId, typeId) pair.
+ *
+ * @param platformId id of a platform the class was registered for.
+ * @param typeId Type ID.
+ * @return Class name
+ * @throws ClassNotFoundException If class was not found.
+ * @throws IgniteCheckedException In case of any other error.
+ */
+ public String getClassName(byte platformId, int typeId) throws ClassNotFoundException, IgniteCheckedException;
/**
* Checks whether the given type is a system one - JDK class or Ignite class.
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
index 923f385..60a627a 100644
--- a/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
+++ b/modules/core/src/main/java/org/apache/ignite/marshaller/optimized/OptimizedMarshallerUtils.java
@@ -35,6 +35,8 @@ import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.marshaller.MarshallerContext;
import org.apache.ignite.marshaller.jdk.JdkMarshaller;
+import static org.apache.ignite.internal.MarshallerPlatformIds.JAVA_ID;
+
/**
* Miscellaneous utility methods to facilitate {@link OptimizedMarshaller}.
*/
@@ -159,7 +161,7 @@ class OptimizedMarshallerUtils {
try {
mapOff = GridUnsafe.objectFieldOffset(HashSet.class.getDeclaredField("map"));
}
- catch (NoSuchFieldException e) {
+ catch (NoSuchFieldException ignored) {
try {
// Workaround for legacy IBM JRE.
mapOff = GridUnsafe.objectFieldOffset(HashSet.class.getDeclaredField("backingMap"));
@@ -203,7 +205,7 @@ class OptimizedMarshallerUtils {
boolean registered;
try {
- registered = ctx.registerClass(typeId, cls);
+ registered = ctx.registerClassName(JAVA_ID, typeId, cls.getName());
}
catch (IgniteCheckedException e) {
throw new IOException("Failed to register class: " + cls.getName(), e);
@@ -246,7 +248,7 @@ class OptimizedMarshallerUtils {
* Gets descriptor for provided ID.
*
* @param clsMap Class descriptors by class map.
- * @param id ID.
+ * @param typeId ID.
* @param ldr Class loader.
* @param ctx Context.
* @param mapper ID mapper.
@@ -256,17 +258,17 @@ class OptimizedMarshallerUtils {
*/
static OptimizedClassDescriptor classDescriptor(
ConcurrentMap<Class, OptimizedClassDescriptor> clsMap,
- int id,
+ int typeId,
ClassLoader ldr,
MarshallerContext ctx,
OptimizedMarshallerIdMapper mapper) throws IOException, ClassNotFoundException {
Class cls;
try {
- cls = ctx.getClass(id, ldr);
+ cls = ctx.getClass(typeId, ldr);
}
catch (IgniteCheckedException e) {
- throw new IOException("Failed to resolve class for ID: " + id, e);
+ throw new IOException("Failed to resolve class for ID: " + typeId, e);
}
OptimizedClassDescriptor desc = clsMap.get(cls);
@@ -307,7 +309,7 @@ class OptimizedMarshallerUtils {
}
}
}
- catch (NoSuchFieldException e) {
+ catch (NoSuchFieldException ignored) {
// No-op.
}
catch (IllegalAccessException e) {
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java b/modules/core/src/main/java/org/apache/ignite/spi/communication/tcp/TcpCommunicationSpi.java
old mode 100644
new mode 100755
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
new file mode 100644
index 0000000..96df255
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoveryDataBag.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.ignite.internal.GridComponent;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Provides interface for {@link GridComponent} to collect and exchange discovery data both on
+ * joining node and on cluster nodes.
+ *
+ * It only organizes interaction with components and doesn't provide any capabilities of converting collected data
+ * into formats eligible for transmitting over media (like marshalling, compressing and so on).
+ */
+public class DiscoveryDataBag {
+ /**
+ * Facade interface representing {@link DiscoveryDataBag} object with discovery data from joining node.
+ */
+ public interface JoiningNodeDiscoveryData {
+ /** */
+ UUID joiningNodeId();
+
+ /** */
+ boolean hasJoiningNodeData();
+
+ /** */
+ Serializable joiningNodeData();
+ }
+
+ /**
+ * Facade interface representing {@link DiscoveryDataBag} object with discovery data collected in the grid.
+ */
+ public interface GridDiscoveryData {
+ /** */
+ UUID joiningNodeId();
+
+ /** */
+ Serializable commonData();
+
+ /** */
+ Map<UUID, Serializable> nodeSpecificData();
+ }
+
+ /**
+ *
+ */
+ private final class JoiningNodeDiscoveryDataImpl implements JoiningNodeDiscoveryData {
+ /** */
+ private int cmpId;
+
+ /** {@inheritDoc} */
+ @Override public UUID joiningNodeId() {
+ return joiningNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean hasJoiningNodeData() {
+ return joiningNodeData.containsKey(cmpId);
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Serializable joiningNodeData() {
+ return joiningNodeData.get(cmpId);
+ }
+
+ /**
+ * @param cmpId Cmp id.
+ */
+ private void setComponentId(int cmpId) {
+ this.cmpId = cmpId;
+ }
+ }
+
+ /**
+ *
+ */
+ private final class GridDiscoveryDataImpl implements GridDiscoveryData {
+ /** */
+ private int cmpId;
+
+ /** */
+ private Map<UUID, Serializable> nodeSpecificData
+ = new LinkedHashMap<>(DiscoveryDataBag.this.nodeSpecificData.size());
+
+ /** {@inheritDoc} */
+ @Override public UUID joiningNodeId() {
+ return joiningNodeId;
+ }
+
+ /** {@inheritDoc} */
+ @Override @Nullable public Serializable commonData() {
+ if (commonData != null)
+ return commonData.get(cmpId);
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<UUID, Serializable> nodeSpecificData() {
+ return nodeSpecificData;
+ }
+
+ /**
+ * @param cmpId component ID.
+ */
+ private void componentId(int cmpId) {
+ this.cmpId = cmpId;
+
+ reinitNodeSpecData(cmpId);
+ }
+
+ /**
+ * @param cmpId component ID.
+ */
+ private void reinitNodeSpecData(int cmpId) {
+ nodeSpecificData.clear();
+
+ for (Map.Entry<UUID, Map<Integer, Serializable>> e : DiscoveryDataBag.this.nodeSpecificData.entrySet()) {
+ if (e.getValue() != null && e.getValue().containsKey(cmpId))
+ nodeSpecificData.put(e.getKey(), e.getValue().get(cmpId));
+ }
+ }
+ }
+
+ /** Used for collecting node-specific data from component.
+ * As component may not know about nodeId it is running on, when component adds node-specific data,
+ * it is firstly collected under this key and then moved to another map with a correct UUID key.
+ */
+ private static final UUID DEFAULT_KEY = null;
+
+ /** */
+ private UUID joiningNodeId;
+
+ /**
+ * Component IDs with already initialized common discovery data.
+ */
+ private Set<Integer> cmnDataInitializedCmps;
+
+ /** */
+ private Map<Integer, Serializable> joiningNodeData = new HashMap<>();
+
+ /** */
+ private Map<Integer, Serializable> commonData = new HashMap<>();
+
+ /** */
+ private Map<UUID, Map<Integer, Serializable>> nodeSpecificData = new LinkedHashMap<>();
+
+ /** */
+ private JoiningNodeDiscoveryDataImpl newJoinerData;
+
+ /** */
+ private GridDiscoveryDataImpl gridData;
+
+ /**
+ * @param joiningNodeId Joining node id.
+ */
+ public DiscoveryDataBag(UUID joiningNodeId) {
+ this.joiningNodeId = joiningNodeId;
+ }
+
+ /**
+ * @param joiningNodeId Joining node id.
+ * @param cmnDataInitializedCmps Component IDs with already initialized common discovery data.
+ */
+ public DiscoveryDataBag(UUID joiningNodeId, Set<Integer> cmnDataInitializedCmps) {
+ this.joiningNodeId = joiningNodeId;
+ this.cmnDataInitializedCmps = cmnDataInitializedCmps;
+ }
+
+ /**
+ *
+ */
+ public UUID joiningNodeId() {
+ return joiningNodeId;
+ }
+
+ /**
+ * @param cmpId component ID.
+ */
+ public GridDiscoveryData gridDiscoveryData(int cmpId) {
+ if (gridData == null)
+ gridData = new GridDiscoveryDataImpl();
+
+ gridData.componentId(cmpId);
+
+ return gridData;
+ }
+
+ /**
+ * @param cmpId component ID.
+ */
+ public JoiningNodeDiscoveryData newJoinerDiscoveryData(int cmpId) {
+ if (newJoinerData == null)
+ newJoinerData = new JoiningNodeDiscoveryDataImpl();
+
+ newJoinerData.setComponentId(cmpId);
+
+ return newJoinerData;
+ }
+
+ /**
+ * @param cmpId component ID.
+ * @param data Data.
+ */
+ public void addJoiningNodeData(Integer cmpId, Serializable data) {
+ joiningNodeData.put(cmpId, data);
+ }
+
+ /**
+ * @param cmpId component ID.
+ * @param data Data.
+ */
+ public void addGridCommonData(Integer cmpId, Serializable data) {
+ commonData.put(cmpId, data);
+ }
+
+ /**
+ * @param cmpId component ID.
+ * @param data Data.
+ */
+ public void addNodeSpecificData(Integer cmpId, Serializable data) {
+ if (!nodeSpecificData.containsKey(DEFAULT_KEY))
+ nodeSpecificData.put(DEFAULT_KEY, new HashMap<Integer, Serializable>());
+
+ nodeSpecificData.get(DEFAULT_KEY).put(cmpId, data);
+ }
+
+ /**
+ * @param cmpId component ID.
+ */
+ public boolean commonDataCollectedFor(Integer cmpId) {
+ assert cmnDataInitializedCmps != null;
+
+ return cmnDataInitializedCmps.contains(cmpId);
+ }
+
+ /**
+ * @param joinNodeData Joining node data.
+ */
+ public void joiningNodeData(Map<Integer, Serializable> joinNodeData) {
+ joiningNodeData.putAll(joinNodeData);
+ }
+
+ /**
+ * @param cmnData Cmn data.
+ */
+ public void commonData(Map<Integer, Serializable> cmnData) {
+ commonData.putAll(cmnData);
+ }
+
+ /**
+ * @param nodeSpecData Node specific data.
+ */
+ public void nodeSpecificData(Map<UUID, Map<Integer, Serializable>> nodeSpecData) {
+ nodeSpecificData.putAll(nodeSpecData);
+ }
+
+ /**
+ *
+ */
+ public Map<Integer, Serializable> joiningNodeData() {
+ return joiningNodeData;
+ }
+
+ /**
+ *
+ */
+ public Map<Integer, Serializable> commonData() {
+ return commonData;
+ }
+
+ /**
+ *
+ */
+ @Nullable public Map<Integer, Serializable> localNodeSpecificData() {
+ return nodeSpecificData.get(DEFAULT_KEY);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
index e591470..d56c943 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/DiscoverySpiDataExchange.java
@@ -17,10 +17,6 @@
package org.apache.ignite.spi.discovery;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.UUID;
-
/**
* Handler for initial data exchange between Ignite nodes. Data exchange
* is initiated by a new node when it tries to join topology and finishes
@@ -32,17 +28,14 @@ public interface DiscoverySpiDataExchange {
* on new node that joins topology to transfer its data to existing
* nodes and on all existing nodes to transfer their data to new node.
*
- * @param joiningNodeId ID of new node that joins topology.
- * @return Collection of discovery data objects from different components.
+ * @param dataBag {@link DiscoveryDataBag} object managing discovery data during node joining process.
*/
- public Map<Integer, Serializable> collect(UUID joiningNodeId);
+ public DiscoveryDataBag collect(DiscoveryDataBag dataBag);
/**
* Notifies discovery manager about data received from remote node.
*
- * @param joiningNodeId ID of new node that joins topology.
- * @param nodeId ID of the node provided data.
- * @param data Collection of discovery data objects from different components.
+ * @param dataBag Collection of discovery data objects from different components.
*/
- public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data);
+ public void onExchange(DiscoveryDataBag dataBag);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
index 8928f28..a5cedf6 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ClientImpl.java
@@ -70,6 +70,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
@@ -623,7 +624,9 @@ class ClientImpl extends TcpDiscoveryImpl {
if (locNode.order() > 0)
node = locNode.clientReconnectNode();
- msg = new TcpDiscoveryJoinRequestMessage(node, spi.collectExchangeData(getLocalNodeId()));
+ msg = new TcpDiscoveryJoinRequestMessage(
+ node,
+ spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
}
else
msg = new TcpDiscoveryClientReconnectMessage(getLocalNodeId(), rmtNodeId, lastMsgId);
@@ -1048,7 +1051,7 @@ class ClientImpl extends TcpDiscoveryImpl {
/** {@inheritDoc} */
@Override protected void body() throws InterruptedException {
- TcpDiscoveryAbstractMessage msg;
+ TcpDiscoveryAbstractMessage msg = null;
while (!Thread.currentThread().isInterrupted()) {
Socket sock;
@@ -1062,7 +1065,8 @@ class ClientImpl extends TcpDiscoveryImpl {
continue;
}
- msg = queue.poll();
+ if (msg == null)
+ msg = queue.poll();
if (msg == null) {
mux.wait();
@@ -1661,11 +1665,10 @@ class ClientImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Added new node to topology: " + node);
- Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+ DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
- if (data != null)
- spi.onExchange(newNodeId, newNodeId, data,
- U.resolveClassLoader(spi.ignite().configuration()));
+ if (dataPacket != null && dataPacket.hasJoiningNodeData())
+ spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
}
}
else {
@@ -1684,13 +1687,10 @@ class ClientImpl extends TcpDiscoveryImpl {
if (getLocalNodeId().equals(msg.nodeId())) {
if (joining()) {
- Map<UUID, Map<Integer, byte[]>> dataMap = msg.clientDiscoData();
+ DiscoveryDataPacket dataContainer = msg.clientDiscoData();
- if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- spi.onExchange(getLocalNodeId(), entry.getKey(), entry.getValue(),
- U.resolveClassLoader(spi.ignite().configuration()));
- }
+ if (dataContainer != null)
+ spi.onExchange(dataContainer, U.resolveClassLoader(spi.ignite().configuration()));
locNode.setAttributes(msg.clientNodeAttributes());
locNode.visible(true);
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
index 7cc35e4..df782e5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/ServerImpl.java
@@ -100,6 +100,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiThread;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNodesRing;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoverySpiState;
@@ -964,7 +965,7 @@ class ServerImpl extends TcpDiscoveryImpl {
@SuppressWarnings({"BusyWait"})
private boolean sendJoinRequestMessage() throws IgniteSpiException {
TcpDiscoveryAbstractMessage joinReq = new TcpDiscoveryJoinRequestMessage(locNode,
- spi.collectExchangeData(getLocalNodeId()));
+ spi.collectExchangeData(new DiscoveryDataPacket(getLocalNodeId())));
// Time when it has been detected, that addresses from IP finder do not respond.
long noResStart = 0;
@@ -1933,7 +1934,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
// Do not need this data for client reconnect.
- addedMsg.oldNodesDiscoveryData(null);
+ if (addedMsg.gridDiscoveryData() != null)
+ addedMsg.clearDiscoveryData();
}
else if (msg instanceof TcpDiscoveryNodeAddFinishedMessage) {
TcpDiscoveryNodeAddFinishedMessage addFinishMsg = (TcpDiscoveryNodeAddFinishedMessage)msg;
@@ -1943,44 +1945,25 @@ class ServerImpl extends TcpDiscoveryImpl {
msg = addFinishMsg;
- Map<UUID, Map<Integer, byte[]>> discoData = addFinishMsg.clientDiscoData();
+ DiscoveryDataPacket discoData = addFinishMsg.clientDiscoData();
- Set<UUID> replaced = null;
+ Set<Integer> mrgdCmnData = new HashSet<>();
+ Set<UUID> mrgdSpecData = new HashSet<>();
+
+ boolean allMerged = false;
for (TcpDiscoveryAbstractMessage msg0 : msgs) {
+
if (msg0 instanceof TcpDiscoveryNodeAddFinishedMessage) {
- Map<UUID, Map<Integer, byte[]>> existingDiscoData =
+ DiscoveryDataPacket existingDiscoData =
((TcpDiscoveryNodeAddFinishedMessage)msg0).clientDiscoData();
- // Check if already stored message contains the same data to do not store copies multiple times.
- if (existingDiscoData != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> e : discoData.entrySet()) {
- UUID nodeId = e.getKey();
-
- if (F.contains(replaced, nodeId))
- continue;
-
- Map<Integer, byte[]> existingData = existingDiscoData.get(e.getKey());
-
- if (existingData != null && mapsEqual(e.getValue(), existingData)) {
- e.setValue(existingData);
-
- if (replaced == null)
- replaced = new HashSet<>();
-
- boolean add = replaced.add(nodeId);
-
- assert add;
-
- if (replaced.size() == discoData.size())
- break;
- }
- }
-
- if (replaced != null && replaced.size() == discoData.size())
- break;
- }
+ if (existingDiscoData != null)
+ allMerged = discoData.mergeDataFrom(existingDiscoData, mrgdCmnData, mrgdSpecData);
}
+
+ if (allMerged)
+ break;
}
}
}
@@ -2011,29 +1994,6 @@ class ServerImpl extends TcpDiscoveryImpl {
}
/**
- * @param m1 Map 1.
- * @param m2 Map 2.
- * @return {@code True} if maps contain the same data.
- */
- private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
- if (m1 == m2)
- return true;
-
- if (m1.size() == m2.size()) {
- for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
- byte[] data = m2.get(e.getKey());
-
- if (!Arrays.equals(e.getValue(), data))
- return false;
- }
-
- return true;
- }
-
- return false;
- }
-
- /**
* Gets messages starting from provided ID (exclusive). If such
* message is not found, {@code null} is returned (this indicates
* a failure condition when it was already removed from queue).
@@ -3748,8 +3708,10 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Internal order has been assigned to node: " + node);
+ DiscoveryDataPacket data = msg.gridDiscoveryData();
+
TcpDiscoveryNodeAddedMessage nodeAddedMsg = new TcpDiscoveryNodeAddedMessage(locNodeId,
- node, msg.discoveryData(), spi.gridStartTime);
+ node, data, spi.gridStartTime);
nodeAddedMsg.client(msg.client());
@@ -3970,7 +3932,7 @@ class ServerImpl extends TcpDiscoveryImpl {
node.id());
if (node.isClient()) {
- addFinishMsg.clientDiscoData(msg.oldNodesDiscoveryData());
+ addFinishMsg.clientDiscoData(msg.gridDiscoveryData());
addFinishMsg.clientNodeAttributes(node.attributes());
}
@@ -4100,12 +4062,12 @@ class ServerImpl extends TcpDiscoveryImpl {
if (topChanged) {
assert !node.visible() : "Added visible node [node=" + node + ", locNode=" + locNode + ']';
- Map<Integer, byte[]> data = msg.newNodeDiscoveryData();
+ DiscoveryDataPacket dataPacket = msg.gridDiscoveryData();
- if (data != null)
- spi.onExchange(node.id(), node.id(), data, U.resolveClassLoader(spi.ignite().configuration()));
+ if (dataPacket.hasJoiningNodeData())
+ spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
- msg.addDiscoveryData(locNodeId, spi.collectExchangeData(node.id()));
+ spi.collectExchangeData(dataPacket);
processMessageFailedNodes(msg);
}
@@ -4116,8 +4078,7 @@ class ServerImpl extends TcpDiscoveryImpl {
}
if (msg.verified() && locNodeId.equals(node.id())) {
- // Discovery data.
- Map<UUID, Map<Integer, byte[]>> dataMap;
+ DiscoveryDataPacket dataPacket;
synchronized (mux) {
if (spiState == CONNECTING && locNode.internalOrder() != node.internalOrder()) {
@@ -4194,7 +4155,7 @@ class ServerImpl extends TcpDiscoveryImpl {
if (log.isDebugEnabled())
log.debug("Restored topology from node added message: " + ring);
- dataMap = msg.oldNodesDiscoveryData();
+ dataPacket = msg.gridDiscoveryData();
topHist.clear();
topHist.putAll(msg.topologyHistory());
@@ -4227,11 +4188,8 @@ class ServerImpl extends TcpDiscoveryImpl {
}
// Notify outside of synchronized block.
- if (dataMap != null) {
- for (Map.Entry<UUID, Map<Integer, byte[]>> entry : dataMap.entrySet())
- spi.onExchange(node.id(), entry.getKey(), entry.getValue(),
- U.resolveClassLoader(spi.ignite().configuration()));
- }
+ if (dataPacket != null)
+ spi.onExchange(dataPacket, U.resolveClassLoader(spi.ignite().configuration()));
processMessageFailedNodes(msg);
}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
index db85cc4..1f2f328 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/TcpDiscoverySpi.java
@@ -22,7 +22,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-import java.io.Serializable;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -51,7 +50,6 @@ import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.AddressResolver;
import org.apache.ignite.configuration.IgniteConfiguration;
-import org.apache.ignite.internal.GridComponent;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.F;
@@ -77,6 +75,7 @@ import org.apache.ignite.spi.IgniteSpiOperationTimeoutException;
import org.apache.ignite.spi.IgniteSpiOperationTimeoutHelper;
import org.apache.ignite.spi.IgniteSpiTimeoutObject;
import org.apache.ignite.spi.IgniteSpiVersionCheckException;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
import org.apache.ignite.spi.discovery.DiscoveryMetricsProvider;
import org.apache.ignite.spi.discovery.DiscoverySpi;
import org.apache.ignite.spi.discovery.DiscoverySpiCustomMessage;
@@ -85,6 +84,7 @@ import org.apache.ignite.spi.discovery.DiscoverySpiHistorySupport;
import org.apache.ignite.spi.discovery.DiscoverySpiListener;
import org.apache.ignite.spi.discovery.DiscoverySpiNodeAuthenticator;
import org.apache.ignite.spi.discovery.DiscoverySpiOrderSupport;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryStatistics;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
@@ -1679,66 +1679,49 @@ public class TcpDiscoverySpi extends IgniteSpiAdapter implements DiscoverySpi, T
}
/**
- * @param nodeId Node ID.
- * @return Marshalled exchange data.
+ * @param dataPacket Data packet.
*/
- protected Map<Integer, byte[]> collectExchangeData(UUID nodeId) {
+ DiscoveryDataPacket collectExchangeData(DiscoveryDataPacket dataPacket) {
if (locNode.isDaemon())
- return Collections.emptyMap();
+ return dataPacket;
- Map<Integer, Serializable> data = exchange.collect(nodeId);
+ assert dataPacket != null;
+ assert dataPacket.joiningNodeId() != null;
- assert data != null;
-
- Map<Integer, byte[]> data0 = U.newHashMap(data.size());
+ //create data bag, pass it to exchange.collect
+ DiscoveryDataBag dataBag = dataPacket.bagForDataCollection();
- for (Map.Entry<Integer, Serializable> entry : data.entrySet()) {
- try {
- byte[] bytes = U.marshal(marshaller(), entry.getValue());
+ exchange.collect(dataBag);
- data0.put(entry.getKey(), bytes);
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to marshal discovery data " +
- "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
- }
- }
+ //marshall collected bag into packet, return packet
+ if (dataPacket.joiningNodeId().equals(locNode.id()))
+ dataPacket.marshalJoiningNodeData(dataBag, marshaller(), log);
+ else
+ dataPacket.marshalGridNodeData(dataBag, locNode.id(), marshaller(), log);
- return data0;
+ return dataPacket;
}
/**
- * @param joiningNodeID Joining node ID.
- * @param nodeId Remote node ID for which data is provided.
- * @param data Collection of marshalled discovery data objects from different components.
+ * @param dataPacket object holding discovery data collected during discovery process.
* @param clsLdr Class loader.
*/
- protected void onExchange(UUID joiningNodeID,
- UUID nodeId,
- Map<Integer, byte[]> data,
- ClassLoader clsLdr)
- {
+ protected void onExchange(DiscoveryDataPacket dataPacket, ClassLoader clsLdr) {
if (locNode.isDaemon())
return;
- Map<Integer, Serializable> data0 = U.newHashMap(data.size());
+ assert dataPacket != null;
+ assert dataPacket.joiningNodeId() != null;
- for (Map.Entry<Integer, byte[]> entry : data.entrySet()) {
- try {
- Serializable compData = U.unmarshal(marshaller(), entry.getValue(), clsLdr);
+ DiscoveryDataBag dataBag;
+
+ if (dataPacket.joiningNodeId().equals(locNode.id()))
+ dataBag = dataPacket.unmarshalGridData(marshaller(), clsLdr, locNode.isClient(), log);
+ else
+ dataBag = dataPacket.unmarshalJoiningNodeData(marshaller(), clsLdr, locNode.isClient(), log);
- data0.put(entry.getKey(), compData);
- }
- catch (IgniteCheckedException e) {
- if (GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC.ordinal() == entry.getKey() &&
- X.hasCause(e, ClassNotFoundException.class) && locNode.isClient())
- U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
- else
- U.error(log, "Failed to unmarshal discovery data for component: " + entry.getKey(), e);
- }
- }
- exchange.onExchange(joiningNodeID, nodeId, data0);
+ exchange.onExchange(dataBag);
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
new file mode 100644
index 0000000..1134de6
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/internal/DiscoveryDataPacket.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ignite.spi.discovery.tcp.internal;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.internal.util.typedef.X;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.marshaller.Marshaller;
+import org.apache.ignite.spi.discovery.DiscoveryDataBag;
+
+import static org.apache.ignite.internal.GridComponent.DiscoveryDataExchangeType.CONTINUOUS_PROC;
+
+/**
+ * Carries discovery data in marshalled form
+ * and allows convenient way of converting it to and from {@link DiscoveryDataBag} objects.
+ */
+public class DiscoveryDataPacket implements Serializable {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private final UUID joiningNodeId;
+
+ /** */
+ private Map<Integer, byte[]> joiningNodeData = new HashMap<>();
+
+ /** */
+ private Map<Integer, byte[]> commonData = new HashMap<>();
+
+ /** */
+ private Map<UUID, Map<Integer, byte[]>> nodeSpecificData = new LinkedHashMap<>();
+
+ /**
+ * @param joiningNodeId Joining node id.
+ */
+ public DiscoveryDataPacket(UUID joiningNodeId) {
+ this.joiningNodeId = joiningNodeId;
+ }
+
+ /**
+ *
+ */
+ public UUID joiningNodeId() {
+ return joiningNodeId;
+ }
+
+ /**
+ * @param bag Bag.
+ * @param nodeId Node id.
+ * @param marsh Marsh.
+ * @param log Logger.
+ */
+ public void marshalGridNodeData(DiscoveryDataBag bag, UUID nodeId, Marshaller marsh, IgniteLogger log) {
+ marshalData(bag.commonData(), commonData, marsh, log);
+
+ Map<Integer, Serializable> locNodeSpecificData = bag.localNodeSpecificData();
+
+ if (locNodeSpecificData != null) {
+ Map<Integer, byte[]> marshLocNodeSpecificData = U.newHashMap(locNodeSpecificData.size());
+
+ marshalData(locNodeSpecificData, marshLocNodeSpecificData, marsh, log);
+
+ filterDuplicatedData(marshLocNodeSpecificData);
+
+ if (!marshLocNodeSpecificData.isEmpty())
+ nodeSpecificData.put(nodeId, marshLocNodeSpecificData);
+ }
+ }
+
+ /**
+ * @param bag Bag.
+ * @param marsh Marsh.
+ * @param log Logger.
+ */
+ public void marshalJoiningNodeData(DiscoveryDataBag bag, Marshaller marsh, IgniteLogger log) {
+ marshalData(bag.joiningNodeData(), joiningNodeData, marsh, log);
+ }
+
+ /**
+ * @param marsh Marsh.
+ * @param clsLdr Class loader.
+ * @param clientNode Client node.
+ * @param log Logger.
+ */
+ public DiscoveryDataBag unmarshalGridData(
+ Marshaller marsh,
+ ClassLoader clsLdr,
+ boolean clientNode,
+ IgniteLogger log
+ ) {
+ DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId);
+
+ if (commonData != null && !commonData.isEmpty()) {
+ Map<Integer, Serializable> unmarshCommonData = unmarshalData(commonData, marsh, clsLdr, clientNode, log);
+
+ dataBag.commonData(unmarshCommonData);
+ }
+
+ if (nodeSpecificData != null && !nodeSpecificData.isEmpty()) {
+ Map<UUID, Map<Integer, Serializable>> unmarshNodeSpecData = U.newLinkedHashMap(nodeSpecificData.size());
+
+ for (Map.Entry<UUID, Map<Integer, byte[]>> nodeBinEntry : nodeSpecificData.entrySet()) {
+ Map<Integer, byte[]> nodeBinData = nodeBinEntry.getValue();
+
+ if (nodeBinData == null || nodeBinData.isEmpty())
+ continue;
+
+ Map<Integer, Serializable> unmarshData = unmarshalData(nodeBinData, marsh, clsLdr, clientNode, log);
+
+ unmarshNodeSpecData.put(nodeBinEntry.getKey(), unmarshData);
+ }
+
+ dataBag.nodeSpecificData(unmarshNodeSpecData);
+ }
+
+ return dataBag;
+ }
+
+ /**
+ * @param marsh Marsh.
+ * @param clsLdr Class loader.
+ * @param clientNode Client node.
+ * @param log Logger.
+ */
+ public DiscoveryDataBag unmarshalJoiningNodeData(
+ Marshaller marsh,
+ ClassLoader clsLdr,
+ boolean clientNode,
+ IgniteLogger log
+ ) {
+ DiscoveryDataBag dataBag = new DiscoveryDataBag(joiningNodeId);
+
+ if (joiningNodeData != null && !joiningNodeData.isEmpty()) {
+ Map<Integer, Serializable> unmarshJoiningNodeData = unmarshalData(
+ joiningNodeData,
+ marsh,
+ clsLdr,
+ clientNode,
+ log);
+
+ dataBag.joiningNodeData(unmarshJoiningNodeData);
+ }
+
+ return dataBag;
+ }
+
+ /**
+ *
+ */
+ public boolean hasJoiningNodeData() {
+ return joiningNodeData != null && !joiningNodeData.isEmpty();
+ }
+
+ /**
+ * @param nodeId Node id.
+ */
+ public boolean hasDataFromNode(UUID nodeId) {
+ return nodeSpecificData.containsKey(nodeId);
+ }
+
+ /**
+ * @param existingDataPacket Existing data packet.
+ * @param mrgdCmnDataKeys Mrgd cmn data keys.
+ * @param mrgdSpecifDataKeys Mrgd specif data keys.
+ */
+ public boolean mergeDataFrom(
+ DiscoveryDataPacket existingDataPacket,
+ Collection<Integer> mrgdCmnDataKeys,
+ Collection<UUID> mrgdSpecifDataKeys
+ ) {
+ if (commonData.size() != mrgdCmnDataKeys.size()) {
+ for (Map.Entry<Integer, byte[]> e : commonData.entrySet()) {
+ if (!mrgdCmnDataKeys.contains(e.getKey())) {
+ byte[] data = existingDataPacket.commonData.get(e.getKey());
+
+ if (data != null && Arrays.equals(e.getValue(), data)) {
+ e.setValue(data);
+
+ boolean add = mrgdCmnDataKeys.add(e.getKey());
+
+ assert add;
+
+ if (mrgdCmnDataKeys.size() == commonData.size())
+ break;
+ }
+ }
+ }
+ }
+
+ if (nodeSpecificData.size() != mrgdSpecifDataKeys.size()) {
+ for (Map.Entry<UUID, Map<Integer, byte[]>> e : nodeSpecificData.entrySet()) {
+ if (!mrgdSpecifDataKeys.contains(e.getKey())) {
+ Map<Integer, byte[]> data = existingDataPacket.nodeSpecificData.get(e.getKey());
+
+ if (data != null && mapsEqual(e.getValue(), data)) {
+ e.setValue(data);
+
+ boolean add = mrgdSpecifDataKeys.add(e.getKey());
+
+ assert add;
+
+ if (mrgdSpecifDataKeys.size() == nodeSpecificData.size())
+ break;
+ }
+ }
+ }
+ }
+
+ return (mrgdCmnDataKeys.size() == commonData.size()) && (mrgdSpecifDataKeys.size() == nodeSpecificData.size());
+ }
+
+ /**
+ * @param m1 first map to compare.
+ * @param m2 second map to compare.
+ */
+ private boolean mapsEqual(Map<Integer, byte[]> m1, Map<Integer, byte[]> m2) {
+ if (m1 == m2)
+ return true;
+
+ if (m1.size() == m2.size()) {
+ for (Map.Entry<Integer, byte[]> e : m1.entrySet()) {
+ byte[] data = m2.get(e.getKey());
+
+ if (!Arrays.equals(e.getValue(), data))
+ return false;
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ /**
+ * @param src Source.
+ * @param marsh Marsh.
+ * @param clsLdr Class loader.
+ * @param log Logger.
+ */
+ private Map<Integer, Serializable> unmarshalData(
+ Map<Integer, byte[]> src,
+ Marshaller marsh,
+ ClassLoader clsLdr,
+ boolean clientNode,
+ IgniteLogger log
+ ) {
+ Map<Integer, Serializable> res = U.newHashMap(src.size());
+
+ for (Map.Entry<Integer, byte[]> binEntry : src.entrySet()) {
+ try {
+ Serializable compData = marsh.unmarshal(binEntry.getValue(), clsLdr);
+ res.put(binEntry.getKey(), compData);
+ }
+ catch (IgniteCheckedException e) {
+ if (CONTINUOUS_PROC.ordinal() == binEntry.getKey() &&
+ X.hasCause(e, ClassNotFoundException.class) && clientNode)
+ U.warn(log, "Failed to unmarshal continuous query remote filter on client node. Can be ignored.");
+ else
+ U.error(log, "Failed to unmarshal discovery data for component: " + binEntry.getKey(), e);
+ }
+ }
+
+ return res;
+ }
+
+ /**
+ * @param src Source.
+ * @param target Target.
+ * @param marsh Marsh.
+ * @param log Logger.
+ */
+ private void marshalData(
+ Map<Integer, Serializable> src,
+ Map<Integer, byte[]> target,
+ Marshaller marsh,
+ IgniteLogger log
+ ) {
+ //may happen if nothing was collected from components,
+ // corresponding map (for common data or for node specific data) left null
+ if (src == null)
+ return;
+
+ for (Map.Entry<Integer, Serializable> entry : src.entrySet()) {
+ try {
+ target.put(entry.getKey(), marsh.marshal(entry.getValue()));
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to marshal discovery data " +
+ "[comp=" + entry.getKey() + ", data=" + entry.getValue() + ']', e);
+ }
+ }
+ }
+
+ /**
+ * TODO https://issues.apache.org/jira/browse/IGNITE-4435
+ */
+ private void filterDuplicatedData(Map<Integer, byte[]> discoData) {
+ for (Map<Integer, byte[]> existingData : nodeSpecificData.values()) {
+ Iterator<Map.Entry<Integer, byte[]>> it = discoData.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Integer, byte[]> discoDataEntry = it.next();
+
+ byte[] curData = existingData.get(discoDataEntry.getKey());
+
+ if (Arrays.equals(curData, discoDataEntry.getValue()))
+ it.remove();
+ }
+
+ if (discoData.isEmpty())
+ break;
+ }
+ }
+
+ /**
+ * Returns {@link DiscoveryDataBag} aware of components with already initialized common data
+ * (e.g. on nodes prior in cluster to the one where this method is called).
+ */
+ public DiscoveryDataBag bagForDataCollection() {
+ return new DiscoveryDataBag(joiningNodeId, commonData.keySet());
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
index 22ffae8..24e3868 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryJoinRequestMessage.java
@@ -17,9 +17,9 @@
package org.apache.ignite.spi.discovery.tcp.messages;
-import java.util.Map;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.S;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
import org.apache.ignite.spi.discovery.tcp.internal.TcpDiscoveryNode;
/**
@@ -33,20 +33,20 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
/** New node that wants to join the topology. */
private final TcpDiscoveryNode node;
- /** Discovery data. */
- private final Map<Integer, byte[]> discoData;
+ /** Discovery data container. */
+ private final DiscoveryDataPacket dataPacket;
/**
* Constructor.
*
* @param node New node that wants to join.
- * @param discoData Discovery data.
+ * @param dataPacket Discovery data.
*/
- public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, Map<Integer, byte[]> discoData) {
+ public TcpDiscoveryJoinRequestMessage(TcpDiscoveryNode node, DiscoveryDataPacket dataPacket) {
super(node.id());
this.node = node;
- this.discoData = discoData;
+ this.dataPacket = dataPacket;
}
/**
@@ -59,10 +59,10 @@ public class TcpDiscoveryJoinRequestMessage extends TcpDiscoveryAbstractMessage
}
/**
- * @return Discovery data.
+ *
*/
- public Map<Integer, byte[]> discoveryData() {
- return discoData;
+ public DiscoveryDataPacket gridDiscoveryData() {
+ return dataPacket;
}
/**
http://git-wip-us.apache.org/repos/asf/ignite/blob/4cd332b7/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
index 80f4565..2c710b5 100644
--- a/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
+++ b/modules/core/src/main/java/org/apache/ignite/spi/discovery/tcp/messages/TcpDiscoveryNodeAddFinishedMessage.java
@@ -21,7 +21,7 @@ import java.util.Map;
import java.util.UUID;
import org.apache.ignite.internal.util.tostring.GridToStringExclude;
import org.apache.ignite.internal.util.typedef.internal.S;
-import org.jetbrains.annotations.Nullable;
+import org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket;
/**
* Sent by coordinator across the ring to finish node add process.
@@ -40,7 +40,7 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
* TcpDiscoveryNodeAddFinishedMessage
*/
@GridToStringExclude
- private Map<UUID, Map<Integer, byte[]>> clientDiscoData;
+ private DiscoveryDataPacket clientDiscoData;
/** */
@GridToStringExclude
@@ -81,17 +81,17 @@ public class TcpDiscoveryNodeAddFinishedMessage extends TcpDiscoveryAbstractMess
/**
* @return Discovery data for joined client.
*/
- public Map<UUID, Map<Integer, byte[]>> clientDiscoData() {
+ public DiscoveryDataPacket clientDiscoData() {
return clientDiscoData;
}
/**
* @param clientDiscoData Discovery data for joined client.
*/
- public void clientDiscoData(@Nullable Map<UUID, Map<Integer, byte[]>> clientDiscoData) {
+ public void clientDiscoData(DiscoveryDataPacket clientDiscoData) {
this.clientDiscoData = clientDiscoData;
- assert clientDiscoData == null || !clientDiscoData.containsKey(nodeId);
+ assert clientDiscoData == null || !clientDiscoData.hasDataFromNode(nodeId);
}
/**