You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:56 UTC

[20/27] ignite git commit: IGNITE-1310: Platforms: moved cluster group to Ignite.

IGNITE-1310: Platforms: moved cluster group to Ignite.


Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/712b29c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/712b29c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/712b29c8

Branch: refs/heads/ignite-1124
Commit: 712b29c856614bd914e6bf35c5203aab624c93e7
Parents: 7c2c02b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:29:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:29:08 2015 +0300

----------------------------------------------------------------------
 .../platform/cluster/PlatformClusterGroup.java  | 330 +++++++++++++++++++
 1 file changed, 330 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/ignite/blob/712b29c8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
new file mode 100644
index 0000000..1f2a002
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Interop projection.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformClusterGroup extends PlatformAbstractTarget {
+    /** */
+    private static final int OP_ALL_METADATA = 1;
+
+    /** */
+    private static final int OP_FOR_ATTRIBUTE = 2;
+
+    /** */
+    private static final int OP_FOR_CACHE = 3;
+
+    /** */
+    private static final int OP_FOR_CLIENT = 4;
+
+    /** */
+    private static final int OP_FOR_DATA = 5;
+
+    /** */
+    private static final int OP_FOR_HOST = 6;
+
+    /** */
+    private static final int OP_FOR_NODE_IDS = 7;
+
+    /** */
+    private static final int OP_METADATA = 8;
+
+    /** */
+    private static final int OP_METRICS = 9;
+
+    /** */
+    private static final int OP_METRICS_FILTERED = 10;
+
+    /** */
+    private static final int OP_NODE_METRICS = 11;
+
+    /** */
+    private static final int OP_NODES = 12;
+
+    /** */
+    private static final int OP_PING_NODE = 13;
+
+    /** */
+    private static final int OP_TOPOLOGY = 14;
+
+    /** Projection. */
+    private final ClusterGroupEx prj;
+
+    /**
+     * Constructor.
+     *
+     * @param platformCtx Context.
+     * @param prj Projection.
+     */
+    public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
+        super(platformCtx);
+
+        this.prj = prj;
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("deprecation")
+    @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS:
+                platformCtx.writeClusterMetrics(writer, prj.metrics());
+
+                break;
+
+            case OP_ALL_METADATA:
+                platformCtx.writeAllMetadata(writer);
+
+                break;
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings({"ConstantConditions", "deprecation"})
+    @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+        Object obj) throws IgniteCheckedException {
+        switch (type) {
+            case OP_METRICS_FILTERED: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics());
+
+                break;
+            }
+
+            case OP_NODES: {
+                long oldTopVer = reader.readLong();
+
+                long curTopVer = platformCtx.kernalContext().discovery().topologyVersion();
+
+                if (curTopVer > oldTopVer) {
+                    writer.writeBoolean(true);
+
+                    writer.writeLong(curTopVer);
+
+                    // At this moment topology version might have advanced, and due to this race
+                    // we return outdated top ver to the callee. But this race is benign, the only
+                    // possible side effect is that the user will re-request nodes and we will return
+                    // the same set of nodes but with more recent topology version.
+                    Collection<ClusterNode> nodes = prj.nodes();
+
+                    platformCtx.writeNodes(writer, nodes);
+                }
+                else
+                    // No discovery events since last invocation.
+                    writer.writeBoolean(false);
+
+                break;
+            }
+
+            case OP_NODE_METRICS: {
+                UUID nodeId = reader.readUuid();
+
+                long lastUpdateTime = reader.readLong();
+
+                // Ask discovery because node might have been filtered out of current projection.
+                ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId);
+
+                ClusterMetrics metrics = null;
+
+                if (node != null) {
+                    ClusterMetrics metrics0 = node.metrics();
+
+                    long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency();
+
+                    metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null;
+                }
+
+                platformCtx.writeClusterMetrics(writer, metrics);
+
+                break;
+            }
+
+            case OP_METADATA: {
+                int typeId = reader.readInt();
+
+                platformCtx.writeMetadata(writer, typeId);
+
+                break;
+            }
+
+            case OP_TOPOLOGY: {
+                long topVer = reader.readLong();
+
+                platformCtx.writeNodes(writer, topology(topVer));
+
+                break;
+            }
+
+            default:
+                throwUnsupported(type);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_PING_NODE:
+                return pingNode(reader.readUuid()) ? TRUE : FALSE;
+        }
+
+        return throwUnsupported(type);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object processInOpObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+        switch (type) {
+            case OP_FOR_NODE_IDS: {
+                Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids));
+            }
+
+            case OP_FOR_ATTRIBUTE:
+                return new PlatformClusterGroup(platformCtx,
+                    (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString()));
+
+            case OP_FOR_CACHE: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName));
+            }
+
+            case OP_FOR_CLIENT: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName));
+            }
+
+            case OP_FOR_DATA: {
+                String cacheName = reader.readString();
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName));
+            }
+
+            case OP_FOR_HOST: {
+                UUID nodeId = reader.readUuid();
+
+                ClusterNode node = prj.node(nodeId);
+
+                return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node));
+            }
+
+            default:
+                return throwUnsupported(type);
+        }
+    }
+
+    /**
+     * @param exclude Projection to exclude.
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRemotes() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forDaemons() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forRandom() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forOldest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+    }
+
+    /**
+     * @return New projection.
+     */
+    public PlatformClusterGroup forYoungest() {
+        return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+    }
+
+    /**
+     * @return Projection.
+     */
+    public ClusterGroupEx projection() {
+        return prj;
+    }
+
+    /**
+     * Resets local I/O, job, and task execution metrics.
+     */
+    public void resetMetrics() {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        ((IgniteCluster)prj).resetMetrics();
+    }
+
+    /**
+     * Pings a remote node.
+     */
+    private boolean pingNode(UUID nodeId) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).pingNode(nodeId);
+    }
+
+    /**
+     * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain
+     * specified topology version (history currently keeps last {@code 1000} snapshots).
+     *
+     * @param topVer Topology version.
+     * @return Collection of grid nodes which represented by specified topology version,
+     * if it is present in history storage, {@code null} otherwise.
+     * @throws UnsupportedOperationException If underlying SPI implementation does not support
+     *      topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+     *      supports topology history.
+     */
+    private Collection<ClusterNode> topology(long topVer) {
+        assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+        return ((IgniteCluster)prj).topology(topVer);
+    }
+}