You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by sb...@apache.org on 2015/08/27 14:19:56 UTC
[20/27] ignite git commit: IGNITE-1310: Platforms: moved cluster
group to Ignite.
IGNITE-1310: Platforms: moved cluster group to Ignite.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/712b29c8
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/712b29c8
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/712b29c8
Branch: refs/heads/ignite-1124
Commit: 712b29c856614bd914e6bf35c5203aab624c93e7
Parents: 7c2c02b
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Thu Aug 27 13:29:08 2015 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Thu Aug 27 13:29:08 2015 +0300
----------------------------------------------------------------------
.../platform/cluster/PlatformClusterGroup.java | 330 +++++++++++++++++++
1 file changed, 330 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/712b29c8/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
----------------------------------------------------------------------
diff --git a/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
new file mode 100644
index 0000000..1f2a002
--- /dev/null
+++ b/modules/platform/src/main/java/org/apache/ignite/internal/processors/platform/cluster/PlatformClusterGroup.java
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.platform.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.internal.cluster.*;
+import org.apache.ignite.internal.portable.*;
+import org.apache.ignite.internal.processors.platform.*;
+import org.apache.ignite.internal.processors.platform.utils.*;
+
+import java.util.*;
+
+/**
+ * Interop projection.
+ */
+@SuppressWarnings({"UnusedDeclaration"})
+public class PlatformClusterGroup extends PlatformAbstractTarget {
+ /** */
+ private static final int OP_ALL_METADATA = 1;
+
+ /** */
+ private static final int OP_FOR_ATTRIBUTE = 2;
+
+ /** */
+ private static final int OP_FOR_CACHE = 3;
+
+ /** */
+ private static final int OP_FOR_CLIENT = 4;
+
+ /** */
+ private static final int OP_FOR_DATA = 5;
+
+ /** */
+ private static final int OP_FOR_HOST = 6;
+
+ /** */
+ private static final int OP_FOR_NODE_IDS = 7;
+
+ /** */
+ private static final int OP_METADATA = 8;
+
+ /** */
+ private static final int OP_METRICS = 9;
+
+ /** */
+ private static final int OP_METRICS_FILTERED = 10;
+
+ /** */
+ private static final int OP_NODE_METRICS = 11;
+
+ /** */
+ private static final int OP_NODES = 12;
+
+ /** */
+ private static final int OP_PING_NODE = 13;
+
+ /** */
+ private static final int OP_TOPOLOGY = 14;
+
+ /** Projection. */
+ private final ClusterGroupEx prj;
+
+ /**
+ * Constructor.
+ *
+ * @param platformCtx Context.
+ * @param prj Projection.
+ */
+ public PlatformClusterGroup(PlatformContext platformCtx, ClusterGroupEx prj) {
+ super(platformCtx);
+
+ this.prj = prj;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("deprecation")
+ @Override protected void processOutOp(int type, PortableRawWriterEx writer) throws IgniteCheckedException {
+ switch (type) {
+ case OP_METRICS:
+ platformCtx.writeClusterMetrics(writer, prj.metrics());
+
+ break;
+
+ case OP_ALL_METADATA:
+ platformCtx.writeAllMetadata(writer);
+
+ break;
+
+ default:
+ throwUnsupported(type);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings({"ConstantConditions", "deprecation"})
+ @Override protected void processInOutOp(int type, PortableRawReaderEx reader, PortableRawWriterEx writer,
+ Object obj) throws IgniteCheckedException {
+ switch (type) {
+ case OP_METRICS_FILTERED: {
+ Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+ platformCtx.writeClusterMetrics(writer, prj.forNodeIds(ids).metrics());
+
+ break;
+ }
+
+ case OP_NODES: {
+ long oldTopVer = reader.readLong();
+
+ long curTopVer = platformCtx.kernalContext().discovery().topologyVersion();
+
+ if (curTopVer > oldTopVer) {
+ writer.writeBoolean(true);
+
+ writer.writeLong(curTopVer);
+
+ // At this moment topology version might have advanced, and due to this race
+ // we return outdated top ver to the callee. But this race is benign, the only
+ // possible side effect is that the user will re-request nodes and we will return
+ // the same set of nodes but with more recent topology version.
+ Collection<ClusterNode> nodes = prj.nodes();
+
+ platformCtx.writeNodes(writer, nodes);
+ }
+ else
+ // No discovery events since last invocation.
+ writer.writeBoolean(false);
+
+ break;
+ }
+
+ case OP_NODE_METRICS: {
+ UUID nodeId = reader.readUuid();
+
+ long lastUpdateTime = reader.readLong();
+
+ // Ask discovery because node might have been filtered out of current projection.
+ ClusterNode node = platformCtx.kernalContext().discovery().node(nodeId);
+
+ ClusterMetrics metrics = null;
+
+ if (node != null) {
+ ClusterMetrics metrics0 = node.metrics();
+
+ long triggerTime = lastUpdateTime + platformCtx.kernalContext().config().getMetricsUpdateFrequency();
+
+ metrics = metrics0.getLastUpdateTime() > triggerTime ? metrics0 : null;
+ }
+
+ platformCtx.writeClusterMetrics(writer, metrics);
+
+ break;
+ }
+
+ case OP_METADATA: {
+ int typeId = reader.readInt();
+
+ platformCtx.writeMetadata(writer, typeId);
+
+ break;
+ }
+
+ case OP_TOPOLOGY: {
+ long topVer = reader.readLong();
+
+ platformCtx.writeNodes(writer, topology(topVer));
+
+ break;
+ }
+
+ default:
+ throwUnsupported(type);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override protected int processInOp(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_PING_NODE:
+ return pingNode(reader.readUuid()) ? TRUE : FALSE;
+ }
+
+ return throwUnsupported(type);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object processInOpObject(int type, PortableRawReaderEx reader) throws IgniteCheckedException {
+ switch (type) {
+ case OP_FOR_NODE_IDS: {
+ Collection<UUID> ids = PlatformUtils.readCollection(reader);
+
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forNodeIds(ids));
+ }
+
+ case OP_FOR_ATTRIBUTE:
+ return new PlatformClusterGroup(platformCtx,
+ (ClusterGroupEx)prj.forAttribute(reader.readString(), reader.readString()));
+
+ case OP_FOR_CACHE: {
+ String cacheName = reader.readString();
+
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forCacheNodes(cacheName));
+ }
+
+ case OP_FOR_CLIENT: {
+ String cacheName = reader.readString();
+
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forClientNodes(cacheName));
+ }
+
+ case OP_FOR_DATA: {
+ String cacheName = reader.readString();
+
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDataNodes(cacheName));
+ }
+
+ case OP_FOR_HOST: {
+ UUID nodeId = reader.readUuid();
+
+ ClusterNode node = prj.node(nodeId);
+
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx) prj.forHost(node));
+ }
+
+ default:
+ return throwUnsupported(type);
+ }
+ }
+
+ /**
+ * @param exclude Projection to exclude.
+ * @return New projection.
+ */
+ public PlatformClusterGroup forOthers(PlatformClusterGroup exclude) {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOthers(exclude.prj));
+ }
+
+ /**
+ * @return New projection.
+ */
+ public PlatformClusterGroup forRemotes() {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRemotes());
+ }
+
+ /**
+ * @return New projection.
+ */
+ public PlatformClusterGroup forDaemons() {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forDaemons());
+ }
+
+ /**
+ * @return New projection.
+ */
+ public PlatformClusterGroup forRandom() {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forRandom());
+ }
+
+ /**
+ * @return New projection.
+ */
+ public PlatformClusterGroup forOldest() {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forOldest());
+ }
+
+ /**
+ * @return New projection.
+ */
+ public PlatformClusterGroup forYoungest() {
+ return new PlatformClusterGroup(platformCtx, (ClusterGroupEx)prj.forYoungest());
+ }
+
+ /**
+ * @return Projection.
+ */
+ public ClusterGroupEx projection() {
+ return prj;
+ }
+
+ /**
+ * Resets local I/O, job, and task execution metrics.
+ */
+ public void resetMetrics() {
+ assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+ ((IgniteCluster)prj).resetMetrics();
+ }
+
+ /**
+ * Pings a remote node.
+ */
+ private boolean pingNode(UUID nodeId) {
+ assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+ return ((IgniteCluster)prj).pingNode(nodeId);
+ }
+
+ /**
+ * Gets a topology by version. Returns {@code null} if topology history storage doesn't contain
+ * specified topology version (history currently keeps last {@code 1000} snapshots).
+ *
+ * @param topVer Topology version.
+ * @return Collection of grid nodes which represented by specified topology version,
+ * if it is present in history storage, {@code null} otherwise.
+ * @throws UnsupportedOperationException If underlying SPI implementation does not support
+ * topology history. Currently only {@link org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi}
+ * supports topology history.
+ */
+ private Collection<ClusterNode> topology(long topVer) {
+ assert prj instanceof IgniteCluster; // Can only be invoked on top-level cluster group.
+
+ return ((IgniteCluster)prj).topology(topVer);
+ }
+}