You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/07/24 14:45:04 UTC

incubator-ignite git commit: # IGNITE-900 Refactoring data exchange API.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-900 [created] 3a9ee47d0


# IGNITE-900 Refactoring data exchange API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/3a9ee47d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/3a9ee47d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/3a9ee47d

Branch: refs/heads/ignite-900
Commit: 3a9ee47d04b1165bf946b09ec7ffe28a1f1b3aab
Parents: ae148f1
Author: sevdokimov <se...@gridgain.com>
Authored: Fri Jul 24 15:44:23 2015 +0300
Committer: sevdokimov <se...@gridgain.com>
Committed: Fri Jul 24 15:44:23 2015 +0300

----------------------------------------------------------------------
 .../apache/ignite/internal/GridComponent.java   |   3 -
 .../managers/discovery/DataExchanger.java       |  55 +++++++
 .../discovery/DataExchangerAdapter.java         |  46 ++++++
 .../discovery/GridDiscoveryManager.java         |  55 +++++--
 .../continuous/GridContinuousProcessor.java     | 160 +++++++++++--------
 5 files changed, 236 insertions(+), 83 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
index 65e0644..c438cde 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridComponent.java
@@ -35,9 +35,6 @@ public interface GridComponent {
      */
     enum DiscoveryDataExchangeType {
         /** */
-        CONTINUOUS_PROC,
-
-        /** */
         CACHE_PROC,
 
         /** */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
new file mode 100644
index 0000000..75ce47d
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchanger.java
@@ -0,0 +1,55 @@
+package org.apache.ignite.internal.managers.discovery;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+/**
+ *
+ */
+public interface DataExchanger<T extends Serializable> {
+    /**
+     * Gets discovery data object that will be sent to new node
+     * during discovery process.
+     *
+     * @param joinedNodeId ID of new node that joins topology.
+     * @return Discovery data object or {@code null} if there is nothing
+     *      to send for this component.
+     */
+    @Nullable public T collectDiscoveryData(UUID joinedNodeId);
+
+    /**
+     * @return Topic ID. Must be unique.
+     */
+    public String topicId();
+
+    /**
+     * Receives discovery data object from remote nodes (called
+     * on new node during discovery process).
+     * @param joiningNodeId Joining node ID.
+     * @param rmtNodeId Remote node ID for which data is provided.
+     * @param data Discovery data object or {@code null} if nothing was
+     */
+    public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, T data);
+
+    /**
+     * @return {@code true} if provider should works on last node only.
+     */
+    public boolean lastNodeOnly();
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
new file mode 100644
index 0000000..3762be2
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/DataExchangerAdapter.java
@@ -0,0 +1,46 @@
+package org.apache.ignite.internal.managers.discovery;/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.util.*;
+
+public abstract class DataExchangerAdapter<T extends Serializable> implements DataExchanger<T> {
+    /** */
+    private final String topicId;
+
+    /**
+     * @param topicId Topic id.
+     */
+    protected DataExchangerAdapter(String topicId) {
+        this.topicId = topicId;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public abstract T collectDiscoveryData(UUID joinedNodeId);
+
+    /** {@inheritDoc} */
+    @Override public String topicId() {
+        return topicId;
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean lastNodeOnly() {
+        return false;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
index 068d374..e887b60 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/managers/discovery/GridDiscoveryManager.java
@@ -191,6 +191,9 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     /** */
     private final CountDownLatch startLatch = new CountDownLatch(1);
 
+    /** */
+    private final ConcurrentMap<String, DataExchanger<?>> dataExchangers = new ConcurrentHashMap8<>();
+
     /** @param ctx Context. */
     public GridDiscoveryManager(GridKernalContext ctx) {
         super(ctx, ctx.config().getDiscoverySpi());
@@ -565,25 +568,51 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
                     }
                 }
 
+                HashMap<String, Serializable> exchangersMap = new HashMap<>();
+
+                for (DataExchanger<?> exchanger : dataExchangers.values()) {
+                    if (!exchanger.lastNodeOnly()) {
+                        Serializable o = exchanger.collectDiscoveryData(nodeId);
+
+                        exchangersMap.put(exchanger.topicId(), o);
+                    }
+                }
+
+                data.put(-1, exchangersMap);
+
                 return data;
             }
 
             @Override public void onExchange(UUID joiningNodeId, UUID nodeId, Map<Integer, Serializable> data) {
                 for (Map.Entry<Integer, Serializable> e : data.entrySet()) {
-                    GridComponent comp = null;
+                    if (e.getKey() == -1) {
+                        Map<String, Serializable> map = (Map<String, Serializable>)e.getValue();
 
-                    for (GridComponent c : ctx.components()) {
-                        if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
-                            comp = c;
+                        for (Map.Entry<String, Serializable> entry : map.entrySet()) {
+                            DataExchanger exchanger = dataExchangers.get(entry.getKey());
 
-                            break;
+                            if (exchanger != null) {
+                                if (!exchanger.lastNodeOnly() || !joiningNodeId.equals(localNode().id()))
+                                    exchanger.onDiscoveryDataReceived(joiningNodeId, nodeId, entry.getValue());
+                            }
                         }
                     }
+                    else {
+                        GridComponent comp = null;
 
-                    if (comp != null)
-                        comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
-                    else
-                        U.warn(log, "Received discovery data for unknown component: " + e.getKey());
+                        for (GridComponent c : ctx.components()) {
+                            if (c.discoveryDataType() != null && c.discoveryDataType().ordinal() == e.getKey()) {
+                                comp = c;
+
+                                break;
+                            }
+                        }
+
+                        if (comp != null)
+                            comp.onDiscoveryDataReceived(joiningNodeId, nodeId, e.getValue());
+                        else
+                            U.warn(log, "Received discovery data for unknown component: " + e.getKey());
+                    }
                 }
             }
         });
@@ -1652,6 +1681,14 @@ public class GridDiscoveryManager extends GridManagerAdapter<DiscoverySpi> {
     }
 
     /**
+     * @param dataExchanger Data exchanger.
+     */
+    public void registerDataExchanger(DataExchanger<?> dataExchanger) {
+        if (dataExchangers.putIfAbsent(dataExchanger.topicId(), dataExchanger) != null)
+            throw new IllegalArgumentException("Duplicate topicId: " + dataExchanger.topicId());
+    }
+
+    /**
      * @param nodeId Node ID to fail.
      * @param warning Warning message to be shown on all nodes.
      */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/3a9ee47d/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
index daa9494..7342b6a 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/processors/continuous/GridContinuousProcessor.java
@@ -244,6 +244,74 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             }
         });
 
+        ctx.discovery().registerDataExchanger(new DataExchangerAdapter<DiscoveryData>("GridContinuousProcessor") {
+            @Nullable @Override public DiscoveryData collectDiscoveryData(UUID joinedNodeId) {
+                if (joinedNodeId.equals(ctx.localNodeId()) && locInfos.isEmpty())
+                    return null;
+
+                DiscoveryData data = new DiscoveryData(ctx.localNodeId());
+
+                // Collect listeners information (will be sent to joining node during discovery process).
+                for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
+                    UUID routineId = e.getKey();
+                    LocalRoutineInfo info = e.getValue();
+
+                    data.addItem(new DiscoveryDataItem(routineId,
+                        info.prjPred,
+                        info.hnd,
+                        info.bufSize,
+                        info.interval,
+                        info.autoUnsubscribe));
+                }
+
+                return data;
+            }
+
+            @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, DiscoveryData data) {
+                if (ctx.isDaemon() || data == null)
+                    return;
+
+                for (DiscoveryDataItem item : data.items) {
+                    try {
+                        if (item.prjPred != null)
+                            ctx.resource().injectGeneric(item.prjPred);
+
+                        // Register handler only if local node passes projection predicate.
+                        if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
+                            if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
+                                item.autoUnsubscribe, false))
+                                item.hnd.onListenerRegistered(item.routineId, ctx);
+                        }
+                    }
+                    catch (IgniteCheckedException e) {
+                        U.error(log, "Failed to register continuous handler.", e);
+                    }
+                }
+            }
+        });
+
+        ctx.discovery().registerDataExchanger(new DataExchangerAdapter<SharedRoutineInfo>(
+            "GridContinuousProcessor.shared") {
+            @Nullable @Override public SharedRoutineInfo collectDiscoveryData(UUID joinedNodeId) {
+                return new SharedRoutineInfo(clientInfos);
+            }
+
+            @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, SharedRoutineInfo data) {
+                for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
+                    Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
+
+                    if (map == null) {
+                        map = new HashMap<>();
+
+                        clientInfos.put(entry.getKey(), map);
+                    }
+
+                    map.putAll(entry.getValue());
+                }
+            }
+        });
+
+
         if (log.isDebugEnabled())
             log.debug("Continuous processor started.");
     }
@@ -318,71 +386,6 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
             log.debug("Continuous processor stopped.");
     }
 
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return DiscoveryDataExchangeType.CONTINUOUS_PROC;
-    }
-
-    /** {@inheritDoc} */
-    @Override @Nullable public Serializable collectDiscoveryData(UUID nodeId) {
-        if (!nodeId.equals(ctx.localNodeId()) || !locInfos.isEmpty()) {
-            DiscoveryData data = new DiscoveryData(ctx.localNodeId(), clientInfos);
-
-            // Collect listeners information (will be sent to joining node during discovery process).
-            for (Map.Entry<UUID, LocalRoutineInfo> e : locInfos.entrySet()) {
-                UUID routineId = e.getKey();
-                LocalRoutineInfo info = e.getValue();
-
-                data.addItem(new DiscoveryDataItem(routineId,
-                    info.prjPred,
-                    info.hnd,
-                    info.bufSize,
-                    info.interval,
-                    info.autoUnsubscribe));
-            }
-
-            return data;
-        }
-
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID joiningNodeId, UUID rmtNodeId, Serializable obj) {
-        DiscoveryData data = (DiscoveryData)obj;
-
-        if (!ctx.isDaemon() && data != null) {
-            for (DiscoveryDataItem item : data.items) {
-                try {
-                    if (item.prjPred != null)
-                        ctx.resource().injectGeneric(item.prjPred);
-
-                    // Register handler only if local node passes projection predicate.
-                    if (item.prjPred == null || item.prjPred.apply(ctx.discovery().localNode())) {
-                        if (registerHandler(data.nodeId, item.routineId, item.hnd, item.bufSize, item.interval,
-                            item.autoUnsubscribe, false))
-                            item.hnd.onListenerRegistered(item.routineId, ctx);
-                    }
-                }
-                catch (IgniteCheckedException e) {
-                    U.error(log, "Failed to register continuous handler.", e);
-                }
-            }
-
-            for (Map.Entry<UUID, Map<UUID, LocalRoutineInfo>> entry : data.clientInfos.entrySet()) {
-                Map<UUID, LocalRoutineInfo> map = clientInfos.get(entry.getKey());
-
-                if (map == null) {
-                    map = new HashMap<>();
-
-                    clientInfos.put(entry.getKey(), map);
-                }
-
-                map.putAll(entry.getValue());
-            }
-        }
-    }
-
     /**
      * Callback invoked when cache is started.
      *
@@ -1260,6 +1263,26 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
     }
 
     /**
+     *
+     */
+    private static class SharedRoutineInfo implements Serializable {
+        /** */
+        private final Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos;
+
+        /**
+         * @param clientInfos Client infos.
+         */
+        SharedRoutineInfo(Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+            this.clientInfos = clientInfos;
+        }
+
+        /** {@inheritDoc} */
+        @Override public String toString() {
+            return S.toString(SharedRoutineInfo.class, this);
+        }
+    }
+
+    /**
      * Discovery data.
      */
     private static class DiscoveryData implements Externalizable {
@@ -1285,15 +1308,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
 
         /**
          * @param nodeId Node ID.
-         * @param clientInfos Client information.
          */
-        DiscoveryData(UUID nodeId, Map<UUID, Map<UUID, LocalRoutineInfo>> clientInfos) {
+        DiscoveryData(UUID nodeId) {
             assert nodeId != null;
 
             this.nodeId = nodeId;
 
-            this.clientInfos = clientInfos;
-
             items = new ArrayList<>();
         }
 
@@ -1308,14 +1328,12 @@ public class GridContinuousProcessor extends GridProcessorAdapter {
         @Override public void writeExternal(ObjectOutput out) throws IOException {
             U.writeUuid(out, nodeId);
             U.writeCollection(out, items);
-            U.writeMap(out, clientInfos);
         }
 
         /** {@inheritDoc} */
         @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
             nodeId = U.readUuid(in);
             items = U.readCollection(in);
-            clientInfos = U.readMap(in);
         }
 
         /** {@inheritDoc} */