You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/24 14:45:04 UTC
incubator-ignite git commit: # IGNITE-900 Refactoring data exchange
API.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-900 [created] 3a9ee47d0
# IGNITE-900 Refactoring data exchange API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a9ee47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a9ee47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a9ee47d
Branch: refs/heads/ignite-900
Commit: 3a9ee47d04b1165bf946b09ec7ffe28a1f1b3aab
Parents: ae148f1
Author: sevdokimov <se...@gridgain.com>
Authored: Fri Jul 24 15:44:23 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Fri Jul 24 15:44:23 2015 +0300
----------------------------------------------------------------------
.../apache/ignite/internal/GridComponent.java | 3 -
.../managers/discovery/DataExchanger.java | 55 +++++++
.../discovery/DataExchangerAdapter.java | 46 ++++++
.../discovery/GridDiscoveryManager.java | 55 +++++--
.../continuous/GridContinuousProcessor.java | 160 +++++++++++--------
5 files changed, 236 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 65e0644..c438cde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -35,9 +35,6 @@ public interface GridComponent {
*/
enum DiscoveryDataExchangeType {
/** */
- CONTINUOUS_PROC,
-
- /** */
CACHE_PROC,
/** */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
new file mode 100644
index 0000000..75ce47d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
@@ -0,0 +1,55 @@
+package org.apache.ignite.internal.managers.discovery;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ *
+ */
+public interface DataExchanger<T extends Serializable> {
+ /**
+ * Gets discovery data object that will be sent to new node
+ * during discovery process.
+ *
+ * @param joinedNodeId ID of new node that joins topology.
+ * @return Discovery data object or {@code null} if there is nothing
+ * to send for this component.
+ */
+ @Nullable public T collectDiscoveryData(UUID joinedNodeId);
+
+ /**
+ * @return Topic ID. Must be unique.
+ */
+ public String topicId();
+
+ /**
+ * Receives discovery data object from remote nodes (called
+ * on new node during discovery process).
+ * @param joiningNodeId Joining node ID.
+ * @param rmtNodeId Remote node ID for which data is provided.
+ * @param data Discovery data object or {@code null} if nothing was
+ */
+ public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, T data);
+
+ /**
+ * @return {@code true} if provider should works on last node only.
+ */
+ public boolean lastNodeOnly();
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
new file mode 100644
index 0000000..3762be2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
@@ -0,0 +1,46 @@
+package org.apache.ignite.internal.managers.discovery;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+public abstract class DataExchangerAdapter<T extends Serializable> implements DataExchanger<T> {
+ /** */
+ private final String topicId;
+
+ /**
+ * @param topicId Topic id.
+ */
+ protected DataExchangerAdapter(String topicId) {
+ this.topicId = topicId;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public abstract T collectDiscoveryData(UUID joinedNodeId);
+
+ /** {@inheritDoc} */
+ @Override public String topicId() {
+ return topicId;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean lastNodeOnly() {
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 068d374..e887b60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -191,6 +191,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
/** */
private final CountDownLatch startLatch = new CountDownLatch(1);
+ /** */
+ private final ConcurrentMap<String, DataExchanger<?>> dataExchangers = new ConcurrentHashMap8<>();
+
/** @param ctx Context. */
public GridDiscoveryManager(GridKernalContext ctx) {
super(ctx, ctx.config().getDiscoverySpi());
@@ -565,25 +568,51 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
}
+ HashMap<String, Serializable> exchangersMap = new HashMap<>();
+
+ for (DataExchanger<?> exchanger : dataExchangers.values()) {
+ if (!exchanger.lastNodeOnly()) {
+ Serializable o = exchanger.collectDiscoveryData(nodeId);
+
+ exchangersMap.put(exchanger.topicId(), o);
+ }
+ }
+
+ data.put(-1, exchangersMap);
+
return data;
}
@Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
for (Map.Entry<Integer, Serializable> e : data.entrySet()) {
- GridComponent comp = null;
+ if (e.getKey() == -1) {
+ Map<String, Serializable> map = (Map<String, Serializable>)e.getValue();
- for (GridComponent c : ctx.components()) {
- if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
- comp = c;
+ for (Map.Entry<String, Serializable> entry : map.entrySet()) {
+ DataExchanger exchanger = dataExchangers.get(entry.getKey());
- break;
+ if (exchanger != null) {
+ if (!exchanger.lastNodeOnly() || !joiningNodeId.equals(localNode().id()))
+ exchanger.onDiscoveryDataReceived(joiningNodeId, nodeId, entry.getValue());
+ }
}
}
+ else {
+ GridComponent comp = null;
- if (comp != null)
- comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
- else
- U.warn(log, "Received discovery data for unknown component: " + e.getKey());
+ for (GridComponent c : ctx.components()) {
+ if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
+ comp = c;
+
+ break;
+ }
+ }
+
+ if (comp != null)
+ comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
+ else
+ U.warn(log, "Received discovery data for unknown component: " + e.getKey());
+ }
}
}
});
@@ -1652,6 +1681,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
}
/**
+ * @param dataExchanger Data exchanger.
+ */
+ public void registerDataExchanger(DataExchanger<?> dataExchanger) {
+ if (dataExchangers.putIfAbsent(dataExchanger.topicId(), dataExchanger) != null)
+ throw new IllegalArgumentException("Duplicate topicId: " + dataExchanger.topicId());
+ }
+
+ /**
* @param nodeId Node ID to fail.
* @param warning Warning message to be shown on all nodes.
*/
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..7342b6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -244,6 +244,74 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
});
+ ctx.discovery().registerDataExchanger(new DataExchangerAdapter<DiscoveryData>("GridContinuousProcessor") {
+ @Nullable @Override public DiscoveryData collectDiscoveryData(UUID joinedNodeId) {
+ if (joinedNodeId.equals(ctx.localNodeId()) && locInfos.isEmpty())
+ return null;
+
+ DiscoveryData data = new DiscoveryData(ctx.localNodeId());
+
+ // Collect listeners information (will be sent to joining node during discovery process).
+ for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
+ UUID routineId = e.getKey();
+ LocalRoutineInfo info = e.getValue();
+
+ data.addItem(new DiscoveryDataItem(routineId,
+ info.prjPred,
+ info.hnd,
+ info.bufSize,
+ info.interval,
+ info.autoUnsubscribe));
+ }
+
+ return data;
+ }
+
+ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, DiscoveryData data) {
+ if (ctx.isDaemon() || data == null)
+ return;
+
+ for (DiscoveryDataItem item : data.items) {
+ try {
+ if (item.prjPred != null)
+ ctx.resource().injectGeneric(item.prjPred);
+
+ // Register handler only if local node passes projection predicate.
+ if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+ if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
+ item.autoUnsubscribe, false))
+ item.hnd.onListenerRegistered(item.routineId, ctx);
+ }
+ }
+ catch (IgniteCheckedException e) {
+ U.error(log, "Failed to register continuous handler.", e);
+ }
+ }
+ }
+ });
+
+ ctx.discovery().registerDataExchanger(new DataExchangerAdapter<SharedRoutineInfo>(
+ "GridContinuousProcessor.shared") {
+ @Nullable @Override public SharedRoutineInfo collectDiscoveryData(UUID joinedNodeId) {
+ return new SharedRoutineInfo(clientInfos);
+ }
+
+ @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, SharedRoutineInfo data) {
+ for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+ Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
+
+ if (map == null) {
+ map = new HashMap<>();
+
+ clientInfos.put(entry.getKey(), map);
+ }
+
+ map.putAll(entry.getValue());
+ }
+ }
+ });
+
+
if (log.isDebugEnabled())
log.debug("Continuous processor started.");
}
@@ -318,71 +386,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
log.debug("Continuous processor stopped.");
}
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return DiscoveryDataExchangeType.CONTINUOUS_PROC;
- }
-
- /** {@inheritDoc} */
- @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
- if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
- DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
-
- // Collect listeners information (will be sent to joining node during discovery process).
- for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
- UUID routineId = e.getKey();
- LocalRoutineInfo info = e.getValue();
-
- data.addItem(new DiscoveryDataItem(routineId,
- info.prjPred,
- info.hnd,
- info.bufSize,
- info.interval,
- info.autoUnsubscribe));
- }
-
- return data;
- }
-
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
- DiscoveryData data = (DiscoveryData)obj;
-
- if (!ctx.isDaemon() && data != null) {
- for (DiscoveryDataItem item : data.items) {
- try {
- if (item.prjPred != null)
- ctx.resource().injectGeneric(item.prjPred);
-
- // Register handler only if local node passes projection predicate.
- if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
- if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
- item.autoUnsubscribe, false))
- item.hnd.onListenerRegistered(item.routineId, ctx);
- }
- }
- catch (IgniteCheckedException e) {
- U.error(log, "Failed to register continuous handler.", e);
- }
- }
-
- for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
- Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
-
- if (map == null) {
- map = new HashMap<>();
-
- clientInfos.put(entry.getKey(), map);
- }
-
- map.putAll(entry.getValue());
- }
- }
- }
-
/**
* Callback invoked when cache is started.
*
@@ -1260,6 +1263,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
}
/**
+ *
+ */
+ private static class SharedRoutineInfo implements Serializable {
+ /** */
+ private final Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
+
+ /**
+ * @param clientInfos Client infos.
+ */
+ SharedRoutineInfo(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+ this.clientInfos = clientInfos;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(SharedRoutineInfo.class, this);
+ }
+ }
+
+ /**
* Discovery data.
*/
private static class DiscoveryData implements Externalizable {
@@ -1285,15 +1308,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
/**
* @param nodeId Node ID.
- * @param clientInfos Client information.
*/
- DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+ DiscoveryData(UUID nodeId) {
assert nodeId != null;
this.nodeId = nodeId;
- this.clientInfos = clientInfos;
-
items = new ArrayList<>();
}
@@ -1308,14 +1328,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
@Override public void writeExternal(ObjectOutput out) throws IOException {
U.writeUuid(out, nodeId);
U.writeCollection(out, items);
- U.writeMap(out, clientInfos);
}
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
nodeId = U.readUuid(in);
items = U.readCollection(in);
- clientInfos = U.readMap(in);
}
/** {@inheritDoc} */