You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/03/01 18:23:28 UTC
incubator-ignite git commit: # IGNITE-301 Rename IgniteClusterImpl to
ClusterProcessor.
Repository: incubator-ignite
Updated Branches:
refs/heads/ignite-301 c91fd9dcf -> 8d50c599d
# IGNITE-301 Rename IgniteClusterImpl to ClusterProcessor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8d50c599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8d50c599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8d50c599
Branch: refs/heads/ignite-301
Commit: 8d50c599dc32fadb1c546c8667d35275f6f9b0fd
Parents: c91fd9d
Author: sevdokimov <se...@jetbrains.com>
Authored: Sun Mar 1 20:23:22 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sun Mar 1 20:23:22 2015 +0300
----------------------------------------------------------------------
.../ignite/internal/GridKernalContext.java | 2 +-
.../ignite/internal/GridKernalContextImpl.java | 8 +-
.../apache/ignite/internal/IgniteKernal.java | 2 +-
.../internal/cluster/ClusterProcessor.java | 579 +++++++++++++++++++
.../cluster/IgniteClusterAsyncImpl.java | 6 +-
.../internal/cluster/IgniteClusterImpl.java | 579 -------------------
6 files changed, 588 insertions(+), 588 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index c342e59..5935658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -515,5 +515,5 @@ public interface GridKernalContext extends Iterable<GridComponent> {
*
* @return Cluster processor.
*/
- public IgniteClusterImpl cluster();
+ public ClusterProcessor cluster();
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index e8c3e4f..bb85818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -246,7 +246,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
/** */
@GridToStringExclude
- private IgniteClusterImpl cluster;
+ private ClusterProcessor cluster;
/** */
@GridToStringExclude
@@ -466,8 +466,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
qryProc = (GridQueryProcessor)comp;
else if (comp instanceof DataStructuresProcessor)
dataStructuresProc = (DataStructuresProcessor)comp;
- else if (comp instanceof IgniteClusterImpl)
- cluster = (IgniteClusterImpl)comp;
+ else if (comp instanceof ClusterProcessor)
+ cluster = (ClusterProcessor)comp;
else
assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
@@ -861,7 +861,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
}
/** {@inheritDoc} */
- @Override public IgniteClusterImpl cluster() {
+ @Override public ClusterProcessor cluster() {
return cluster;
}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3e0b0a..ee0428d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -674,7 +674,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
igfsExecSvc,
restExecSvc);
- startProcessor(ctx, new IgniteClusterImpl(ctx), attrs);
+ startProcessor(ctx, new ClusterProcessor(ctx), attrs);
U.onGridStart();
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
new file mode 100644
index 0000000..413b108
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nodestart.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
+
+/**
+ *
+ */
+public class ClusterProcessor extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor {
+ /** */
+ private static final long serialVersionUID = 0L;
+
+ /** */
+ private IgniteConfiguration cfg;
+
+ /** Node local store. */
+ @GridToStringExclude
+ private ConcurrentMap nodeLoc;
+
+ /**
+ * Required by {@link Externalizable}.
+ */
+ public ClusterProcessor() {
+ // No-op.
+ }
+
+ /**
+ * @param ctx Kernal context.
+ */
+ public ClusterProcessor(GridKernalContext ctx) {
+ super(ctx, null, (IgnitePredicate<ClusterNode>)null);
+
+ cfg = ctx.config();
+
+ nodeLoc = new ClusterNodeLocalMapImpl(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterGroup forLocal() {
+ guard();
+
+ try {
+ return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId()));
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode localNode() {
+ guard();
+
+ try {
+ ClusterNode node = ctx.discovery().localNode();
+
+ assert node != null;
+
+ return node;
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
+ guard();
+
+ try {
+ return nodeLoc;
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean pingNode(UUID nodeId) {
+ A.notNull(nodeId, "nodeId");
+
+ guard();
+
+ try {
+ return ctx.discovery().pingNode(nodeId);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public long topologyVersion() {
+ guard();
+
+ try {
+ return ctx.discovery().topologyVersion();
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException {
+ guard();
+
+ try {
+ return ctx.discovery().topology(topVer);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
+ @Nullable Collection<? extends K> keys)
+ throws IgniteException
+ {
+ if (F.isEmpty(keys))
+ return Collections.emptyMap();
+
+ guard();
+
+ try {
+ return ctx.affinity().mapKeysToNodes(cacheName, keys);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException {
+ A.notNull(key, "key");
+
+ guard();
+
+ try {
+ return ctx.affinity().mapKeyToNode(cacheName, key);
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
+ boolean restart,
+ int timeout,
+ int maxConn)
+ throws IgniteException
+ {
+ try {
+ return startNodesAsync(file, restart, timeout, maxConn).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts,
+ @Nullable Map<String, Object> dflts,
+ boolean restart,
+ int timeout,
+ int maxConn)
+ throws IgniteException
+ {
+ try {
+ return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
+ }
+ catch (IgniteCheckedException e) {
+ throw U.convertException(e);
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stopNodes() throws IgniteException {
+ guard();
+
+ try {
+ compute().execute(IgniteKillTask.class, false);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stopNodes(Collection<UUID> ids) throws IgniteException {
+ guard();
+
+ try {
+ ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restartNodes() throws IgniteException {
+ guard();
+
+ try {
+ compute().execute(IgniteKillTask.class, true);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void restartNodes(Collection<UUID> ids) throws IgniteException {
+ guard();
+
+ try {
+ ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetMetrics() {
+ guard();
+
+ try {
+ ctx.jobMetric().reset();
+ ctx.io().resetMetrics();
+ ctx.task().resetMetrics();
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCluster withAsync() {
+ return new IgniteClusterAsyncImpl(this);
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync() {
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> future() {
+ throw new IllegalStateException("Asynchronous mode is not enabled.");
+ }
+
+ /**
+ * @param file Configuration file.
+ * @param restart Whether to stop existing nodes.
+ * @param timeout Connection timeout.
+ * @param maxConn Number of parallel SSH connections to one host.
+ * @return Future with results.
+ * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}.
+ */
+ IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file,
+ boolean restart,
+ int timeout,
+ int maxConn)
+ {
+ A.notNull(file, "file");
+ A.ensure(file.exists(), "file doesn't exist.");
+ A.ensure(file.isFile(), "file is a directory.");
+
+ try {
+ IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
+
+ return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(ctx, e);
+ }
+ }
+
+ /**
+ * @param hosts Startup parameters.
+ * @param dflts Default values.
+ * @param restart Whether to stop existing nodes
+ * @param timeout Connection timeout in milliseconds.
+ * @param maxConn Number of parallel SSH connections to one host.
+ * @return Future with results.
+ * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}.
+ */
+ IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(
+ Collection<Map<String, Object>> hosts,
+ @Nullable Map<String, Object> dflts,
+ boolean restart,
+ int timeout,
+ int maxConn)
+ {
+ A.notNull(hosts, "hosts");
+
+ guard();
+
+ try {
+ IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false);
+
+ Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts);
+
+ Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>();
+
+ int nodeCallCnt = 0;
+
+ for (String host : specsMap.keySet()) {
+ InetAddress addr;
+
+ try {
+ addr = InetAddress.getByName(host);
+ }
+ catch (UnknownHostException e) {
+ throw new IgniteCheckedException("Invalid host name: " + host, e);
+ }
+
+ Collection<? extends ClusterNode> neighbors = null;
+
+ if (addr.isLoopbackAddress())
+ neighbors = neighbors();
+ else {
+ for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) {
+ ClusterNode node = F.first(p);
+
+ if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) {
+ neighbors = p;
+
+ break;
+ }
+ }
+ }
+
+ int startIdx = 1;
+
+ if (neighbors != null) {
+ if (restart && !neighbors.isEmpty()) {
+ try {
+ ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false);
+ }
+ catch (ClusterGroupEmptyException ignored) {
+ // No-op, nothing to restart.
+ }
+ }
+ else
+ startIdx = neighbors.size() + 1;
+ }
+
+ ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>();
+
+ runMap.put(host, nodeRuns);
+
+ for (IgniteRemoteStartSpecification spec : specsMap.get(host)) {
+ assert spec.host().equals(host);
+
+ for (int i = startIdx; i <= spec.nodes(); i++) {
+ nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout));
+
+ nodeCallCnt++;
+ }
+ }
+ }
+
+ // If there is nothing to start, return finished future with empty result.
+ if (nodeCallCnt == 0)
+ return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>(
+ ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList());
+
+ // Exceeding max line width for readability.
+ GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+ fut = new GridCompoundFuture<>(
+ ctx,
+ CU.<GridTuple3<String, Boolean, String>>objectsReducer()
+ );
+
+ AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
+
+ // Limit maximum simultaneous connection number per host.
+ for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) {
+ for (int i = 0; i < maxConn; i++) {
+ if (!runNextNodeCallable(queue, fut, cnt))
+ break;
+ }
+ }
+
+ return fut;
+ }
+ catch (IgniteCheckedException e) {
+ return new GridFinishedFuture<>(ctx, e);
+ }
+ finally {
+ unguard();
+ }
+ }
+
+ /**
+ * Gets the all grid nodes that reside on the same physical computer as local grid node.
+ * Local grid node is excluded.
+ * <p>
+ * Detection of the same physical computer is based on comparing set of network interface MACs.
+ * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
+ * physical computer.
+ * @return Grid nodes that reside on the same physical computer as local grid node.
+ */
+ private Collection<ClusterNode> neighbors() {
+ Collection<ClusterNode> neighbors = new ArrayList<>(1);
+
+ String macs = localNode().attribute(ATTR_MACS);
+
+ assert macs != null;
+
+ for (ClusterNode n : forOthers(localNode()).nodes()) {
+ if (macs.equals(n.attribute(ATTR_MACS)))
+ neighbors.add(n);
+ }
+
+ return neighbors;
+ }
+
+ /**
+ * Runs next callable from host node start queue.
+ *
+ * @param queue Queue of tasks to poll from.
+ * @param comp Compound future that comprise all started node tasks.
+ * @param cnt Atomic counter to check if all futures are added to compound future.
+ * @return {@code True} if task was started, {@code false} if queue was empty.
+ */
+ private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue,
+ final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+ comp,
+ final AtomicInteger cnt)
+ {
+ IgniteNodeCallable call = queue.poll();
+
+ if (call == null)
+ return false;
+
+ IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true);
+
+ comp.add(fut);
+
+ if (cnt.decrementAndGet() == 0)
+ comp.markInitialized();
+
+ fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() {
+ @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) {
+ runNextNodeCallable(queue, comp, cnt);
+ }
+ });
+
+ return true;
+ }
+
+ /**
+ * Clears node local map.
+ */
+ public void clearNodeMap() {
+ nodeLoc.clear();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ ctx = (GridKernalContext)in.readObject();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeObject(ctx);
+ }
+
+ /** {@inheritDoc} */
+ @Override protected Object readResolve() throws ObjectStreamException {
+ return ctx.grid().cluster();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStart() throws IgniteCheckedException {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Override public void printMemoryStats() {
+ // No-op.
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ public String toString() {
+ return "IgniteCluster [igniteName=" + ctx.gridName() + ']';
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 960bacd..43a6435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -38,7 +38,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
private static final long serialVersionUID = 0L;
/** */
- private IgniteClusterImpl cluster;
+ private ClusterProcessor cluster;
/**
* Required by {@link Externalizable}.
@@ -50,7 +50,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
/**
* @param cluster Cluster.
*/
- public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) {
+ public IgniteClusterAsyncImpl(ClusterProcessor cluster) {
super(true);
this.cluster = cluster;
@@ -274,7 +274,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
/** {@inheritDoc} */
@Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- cluster = (IgniteClusterImpl)in.readObject();
+ cluster = (ClusterProcessor)in.readObject();
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
deleted file mode 100644
index 45462a4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.cluster;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.nodestart.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
-
-/**
- *
- */
-public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor {
- /** */
- private static final long serialVersionUID = 0L;
-
- /** */
- private IgniteConfiguration cfg;
-
- /** Node local store. */
- @GridToStringExclude
- private ConcurrentMap nodeLoc;
-
- /**
- * Required by {@link Externalizable}.
- */
- public IgniteClusterImpl() {
- // No-op.
- }
-
- /**
- * @param ctx Kernal context.
- */
- public IgniteClusterImpl(GridKernalContext ctx) {
- super(ctx, null, (IgnitePredicate<ClusterNode>)null);
-
- cfg = ctx.config();
-
- nodeLoc = new ClusterNodeLocalMapImpl(ctx);
- }
-
- /** {@inheritDoc} */
- @Override public ClusterGroup forLocal() {
- guard();
-
- try {
- return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId()));
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public ClusterNode localNode() {
- guard();
-
- try {
- ClusterNode node = ctx.discovery().localNode();
-
- assert node != null;
-
- return node;
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("unchecked")
- @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
- guard();
-
- try {
- return nodeLoc;
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public boolean pingNode(UUID nodeId) {
- A.notNull(nodeId, "nodeId");
-
- guard();
-
- try {
- return ctx.discovery().pingNode(nodeId);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public long topologyVersion() {
- guard();
-
- try {
- return ctx.discovery().topologyVersion();
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException {
- guard();
-
- try {
- return ctx.discovery().topology(topVer);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
- @Nullable Collection<? extends K> keys)
- throws IgniteException
- {
- if (F.isEmpty(keys))
- return Collections.emptyMap();
-
- guard();
-
- try {
- return ctx.affinity().mapKeysToNodes(cacheName, keys);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException {
- A.notNull(key, "key");
-
- guard();
-
- try {
- return ctx.affinity().mapKeyToNode(cacheName, key);
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
- boolean restart,
- int timeout,
- int maxConn)
- throws IgniteException
- {
- try {
- return startNodesAsync(file, restart, timeout, maxConn).get();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts,
- @Nullable Map<String, Object> dflts,
- boolean restart,
- int timeout,
- int maxConn)
- throws IgniteException
- {
- try {
- return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
- }
- catch (IgniteCheckedException e) {
- throw U.convertException(e);
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stopNodes() throws IgniteException {
- guard();
-
- try {
- compute().execute(IgniteKillTask.class, false);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void stopNodes(Collection<UUID> ids) throws IgniteException {
- guard();
-
- try {
- ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void restartNodes() throws IgniteException {
- guard();
-
- try {
- compute().execute(IgniteKillTask.class, true);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void restartNodes(Collection<UUID> ids) throws IgniteException {
- guard();
-
- try {
- ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true);
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public void resetMetrics() {
- guard();
-
- try {
- ctx.jobMetric().reset();
- ctx.io().resetMetrics();
- ctx.task().resetMetrics();
- }
- finally {
- unguard();
- }
- }
-
- /** {@inheritDoc} */
- @Override public IgniteCluster withAsync() {
- return new IgniteClusterAsyncImpl(this);
- }
-
- /** {@inheritDoc} */
- @Override public boolean isAsync() {
- return false;
- }
-
- /** {@inheritDoc} */
- @Override public <R> IgniteFuture<R> future() {
- throw new IllegalStateException("Asynchronous mode is not enabled.");
- }
-
- /**
- * @param file Configuration file.
- * @param restart Whether to stop existing nodes.
- * @param timeout Connection timeout.
- * @param maxConn Number of parallel SSH connections to one host.
- * @return Future with results.
- * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}.
- */
- IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file,
- boolean restart,
- int timeout,
- int maxConn)
- {
- A.notNull(file, "file");
- A.ensure(file.exists(), "file doesn't exist.");
- A.ensure(file.isFile(), "file is a directory.");
-
- try {
- IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
-
- return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, e);
- }
- }
-
- /**
- * @param hosts Startup parameters.
- * @param dflts Default values.
- * @param restart Whether to stop existing nodes
- * @param timeout Connection timeout in milliseconds.
- * @param maxConn Number of parallel SSH connections to one host.
- * @return Future with results.
- * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}.
- */
- IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(
- Collection<Map<String, Object>> hosts,
- @Nullable Map<String, Object> dflts,
- boolean restart,
- int timeout,
- int maxConn)
- {
- A.notNull(hosts, "hosts");
-
- guard();
-
- try {
- IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false);
-
- Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts);
-
- Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>();
-
- int nodeCallCnt = 0;
-
- for (String host : specsMap.keySet()) {
- InetAddress addr;
-
- try {
- addr = InetAddress.getByName(host);
- }
- catch (UnknownHostException e) {
- throw new IgniteCheckedException("Invalid host name: " + host, e);
- }
-
- Collection<? extends ClusterNode> neighbors = null;
-
- if (addr.isLoopbackAddress())
- neighbors = neighbors();
- else {
- for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) {
- ClusterNode node = F.first(p);
-
- if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) {
- neighbors = p;
-
- break;
- }
- }
- }
-
- int startIdx = 1;
-
- if (neighbors != null) {
- if (restart && !neighbors.isEmpty()) {
- try {
- ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false);
- }
- catch (ClusterGroupEmptyException ignored) {
- // No-op, nothing to restart.
- }
- }
- else
- startIdx = neighbors.size() + 1;
- }
-
- ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>();
-
- runMap.put(host, nodeRuns);
-
- for (IgniteRemoteStartSpecification spec : specsMap.get(host)) {
- assert spec.host().equals(host);
-
- for (int i = startIdx; i <= spec.nodes(); i++) {
- nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout));
-
- nodeCallCnt++;
- }
- }
- }
-
- // If there is nothing to start, return finished future with empty result.
- if (nodeCallCnt == 0)
- return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>(
- ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList());
-
- // Exceeding max line width for readability.
- GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
- fut = new GridCompoundFuture<>(
- ctx,
- CU.<GridTuple3<String, Boolean, String>>objectsReducer()
- );
-
- AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
-
- // Limit maximum simultaneous connection number per host.
- for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) {
- for (int i = 0; i < maxConn; i++) {
- if (!runNextNodeCallable(queue, fut, cnt))
- break;
- }
- }
-
- return fut;
- }
- catch (IgniteCheckedException e) {
- return new GridFinishedFuture<>(ctx, e);
- }
- finally {
- unguard();
- }
- }
-
- /**
- * Gets the all grid nodes that reside on the same physical computer as local grid node.
- * Local grid node is excluded.
- * <p>
- * Detection of the same physical computer is based on comparing set of network interface MACs.
- * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
- * physical computer.
- * @return Grid nodes that reside on the same physical computer as local grid node.
- */
- private Collection<ClusterNode> neighbors() {
- Collection<ClusterNode> neighbors = new ArrayList<>(1);
-
- String macs = localNode().attribute(ATTR_MACS);
-
- assert macs != null;
-
- for (ClusterNode n : forOthers(localNode()).nodes()) {
- if (macs.equals(n.attribute(ATTR_MACS)))
- neighbors.add(n);
- }
-
- return neighbors;
- }
-
- /**
- * Runs next callable from host node start queue.
- *
- * @param queue Queue of tasks to poll from.
- * @param comp Compound future that comprise all started node tasks.
- * @param cnt Atomic counter to check if all futures are added to compound future.
- * @return {@code True} if task was started, {@code false} if queue was empty.
- */
- private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue,
- final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
- comp,
- final AtomicInteger cnt)
- {
- IgniteNodeCallable call = queue.poll();
-
- if (call == null)
- return false;
-
- IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true);
-
- comp.add(fut);
-
- if (cnt.decrementAndGet() == 0)
- comp.markInitialized();
-
- fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() {
- @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) {
- runNextNodeCallable(queue, comp, cnt);
- }
- });
-
- return true;
- }
-
- /**
- * Clears node local map.
- */
- public void clearNodeMap() {
- nodeLoc.clear();
- }
-
- /** {@inheritDoc} */
- @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
- ctx = (GridKernalContext)in.readObject();
- }
-
- /** {@inheritDoc} */
- @Override public void writeExternal(ObjectOutput out) throws IOException {
- out.writeObject(ctx);
- }
-
- /** {@inheritDoc} */
- @Override protected Object readResolve() throws ObjectStreamException {
- return ctx.grid().cluster();
- }
-
- /** {@inheritDoc} */
- @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void stop(boolean cancel) throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStart() throws IgniteCheckedException {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Override public void printMemoryStats() {
- // No-op.
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
- return null;
- }
-
- /** {@inheritDoc} */
- @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
- return null;
- }
-
- /** {@inheritDoc} */
- public String toString() {
- return "IgniteCluster [igniteName=" + ctx.gridName() + ']';
- }
-}