You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by se...@apache.org on 2015/03/01 18:23:28 UTC

incubator-ignite git commit: # IGNITE-301 Rename IgniteClusterImpl to ClusterProcessor.

Repository: incubator-ignite
Updated Branches:
  refs/heads/ignite-301 c91fd9dcf -> 8d50c599d


# IGNITE-301 Rename IgniteClusterImpl to ClusterProcessor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ignite/commit/8d50c599
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ignite/tree/8d50c599
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ignite/diff/8d50c599

Branch: refs/heads/ignite-301
Commit: 8d50c599dc32fadb1c546c8667d35275f6f9b0fd
Parents: c91fd9d
Author: sevdokimov <se...@jetbrains.com>
Authored: Sun Mar 1 20:23:22 2015 +0300
Committer: sevdokimov <se...@jetbrains.com>
Committed: Sun Mar 1 20:23:22 2015 +0300

----------------------------------------------------------------------
 .../ignite/internal/GridKernalContext.java      |   2 +-
 .../ignite/internal/GridKernalContextImpl.java  |   8 +-
 .../apache/ignite/internal/IgniteKernal.java    |   2 +-
 .../internal/cluster/ClusterProcessor.java      | 579 +++++++++++++++++++
 .../cluster/IgniteClusterAsyncImpl.java         |   6 +-
 .../internal/cluster/IgniteClusterImpl.java     | 579 -------------------
 6 files changed, 588 insertions(+), 588 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
index c342e59..5935658 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContext.java
@@ -515,5 +515,5 @@ public interface GridKernalContext extends Iterable<GridComponent> {
      *
      * @return Cluster processor.
      */
-    public IgniteClusterImpl cluster();
+    public ClusterProcessor cluster();
 }

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
index e8c3e4f..bb85818 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/GridKernalContextImpl.java
@@ -246,7 +246,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
 
     /** */
     @GridToStringExclude
-    private IgniteClusterImpl cluster;
+    private ClusterProcessor cluster;
 
     /** */
     @GridToStringExclude
@@ -466,8 +466,8 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
             qryProc = (GridQueryProcessor)comp;
         else if (comp instanceof DataStructuresProcessor)
             dataStructuresProc = (DataStructuresProcessor)comp;
-        else if (comp instanceof IgniteClusterImpl)
-            cluster = (IgniteClusterImpl)comp;
+        else if (comp instanceof ClusterProcessor)
+            cluster = (ClusterProcessor)comp;
         else
             assert (comp instanceof GridPluginComponent) : "Unknown manager class: " + comp.getClass();
 
@@ -861,7 +861,7 @@ public class GridKernalContextImpl implements GridKernalContext, Externalizable
     }
 
     /** {@inheritDoc} */
-    @Override public IgniteClusterImpl cluster() {
+    @Override public ClusterProcessor cluster() {
         return cluster;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
index e3e0b0a..ee0428d 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/IgniteKernal.java
@@ -674,7 +674,7 @@ public class IgniteKernal implements IgniteEx, IgniteMXBean, Externalizable {
                 igfsExecSvc,
                 restExecSvc);
 
-            startProcessor(ctx, new IgniteClusterImpl(ctx), attrs);
+            startProcessor(ctx, new ClusterProcessor(ctx), attrs);
 
             U.onGridStart();
 

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
new file mode 100644
index 0000000..413b108
--- /dev/null
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/ClusterProcessor.java
@@ -0,0 +1,579 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.cluster;
+
+import org.apache.ignite.*;
+import org.apache.ignite.cluster.*;
+import org.apache.ignite.configuration.*;
+import org.apache.ignite.internal.*;
+import org.apache.ignite.internal.processors.*;
+import org.apache.ignite.internal.util.future.*;
+import org.apache.ignite.internal.util.lang.*;
+import org.apache.ignite.internal.util.nodestart.*;
+import org.apache.ignite.internal.util.tostring.*;
+import org.apache.ignite.internal.util.typedef.*;
+import org.apache.ignite.internal.util.typedef.internal.*;
+import org.apache.ignite.lang.*;
+import org.apache.ignite.spi.*;
+import org.jetbrains.annotations.*;
+
+import java.io.*;
+import java.net.*;
+import java.util.*;
+import java.util.concurrent.*;
+import java.util.concurrent.atomic.*;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.*;
+import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
+
+/**
+ *
+ */
+public class ClusterProcessor extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor {
+    /** */
+    private static final long serialVersionUID = 0L;
+
+    /** */
+    private IgniteConfiguration cfg;
+
+    /** Node local store. */
+    @GridToStringExclude
+    private ConcurrentMap nodeLoc;
+
+    /**
+     * Required by {@link Externalizable}.
+     */
+    public ClusterProcessor() {
+        // No-op.
+    }
+
+    /**
+     * @param ctx Kernal context.
+     */
+    public ClusterProcessor(GridKernalContext ctx) {
+        super(ctx, null, (IgnitePredicate<ClusterNode>)null);
+
+        cfg = ctx.config();
+
+        nodeLoc = new ClusterNodeLocalMapImpl(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterGroup forLocal() {
+        guard();
+
+        try {
+            return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId()));
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public ClusterNode localNode() {
+        guard();
+
+        try {
+            ClusterNode node = ctx.discovery().localNode();
+
+            assert node != null;
+
+            return node;
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @SuppressWarnings("unchecked")
+    @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
+        guard();
+
+        try {
+            return nodeLoc;
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean pingNode(UUID nodeId) {
+        A.notNull(nodeId, "nodeId");
+
+        guard();
+
+        try {
+            return ctx.discovery().pingNode(nodeId);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public long topologyVersion() {
+        guard();
+
+        try {
+            return ctx.discovery().topologyVersion();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException {
+        guard();
+
+        try {
+            return ctx.discovery().topology(topVer);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
+        @Nullable Collection<? extends K> keys)
+        throws IgniteException
+    {
+        if (F.isEmpty(keys))
+            return Collections.emptyMap();
+
+        guard();
+
+        try {
+            return ctx.affinity().mapKeysToNodes(cacheName, keys);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException {
+        A.notNull(key, "key");
+
+        guard();
+
+        try {
+            return ctx.affinity().mapKeyToNode(cacheName, key);
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
+        boolean restart,
+        int timeout,
+        int maxConn)
+        throws IgniteException
+    {
+        try {
+            return startNodesAsync(file, restart, timeout, maxConn).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts,
+        boolean restart,
+        int timeout,
+        int maxConn)
+        throws IgniteException
+    {
+        try {
+            return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
+        }
+        catch (IgniteCheckedException e) {
+            throw U.convertException(e);
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes() throws IgniteException {
+        guard();
+
+        try {
+            compute().execute(IgniteKillTask.class, false);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stopNodes(Collection<UUID> ids) throws IgniteException {
+        guard();
+
+        try {
+            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes() throws IgniteException {
+        guard();
+
+        try {
+            compute().execute(IgniteKillTask.class, true);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void restartNodes(Collection<UUID> ids) throws IgniteException {
+        guard();
+
+        try {
+            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public void resetMetrics() {
+        guard();
+
+        try {
+            ctx.jobMetric().reset();
+            ctx.io().resetMetrics();
+            ctx.task().resetMetrics();
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /** {@inheritDoc} */
+    @Override public IgniteCluster withAsync() {
+        return new IgniteClusterAsyncImpl(this);
+    }
+
+    /** {@inheritDoc} */
+    @Override public boolean isAsync() {
+        return false;
+    }
+
+    /** {@inheritDoc} */
+    @Override public <R> IgniteFuture<R> future() {
+        throw new IllegalStateException("Asynchronous mode is not enabled.");
+    }
+
+    /**
+     * @param file Configuration file.
+     * @param restart Whether to stop existing nodes.
+     * @param timeout Connection timeout.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return Future with results.
+     * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}.
+     */
+    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file,
+      boolean restart,
+      int timeout,
+      int maxConn)
+    {
+        A.notNull(file, "file");
+        A.ensure(file.exists(), "file doesn't exist.");
+        A.ensure(file.isFile(), "file is a directory.");
+
+        try {
+            IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
+
+            return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+    }
+
+    /**
+     * @param hosts Startup parameters.
+     * @param dflts Default values.
+     * @param restart Whether to stop existing nodes
+     * @param timeout Connection timeout in milliseconds.
+     * @param maxConn Number of parallel SSH connections to one host.
+     * @return Future with results.
+     * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}.
+     */
+    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(
+        Collection<Map<String, Object>> hosts,
+        @Nullable Map<String, Object> dflts,
+        boolean restart,
+        int timeout,
+        int maxConn)
+    {
+        A.notNull(hosts, "hosts");
+
+        guard();
+
+        try {
+            IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false);
+
+            Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts);
+
+            Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>();
+
+            int nodeCallCnt = 0;
+
+            for (String host : specsMap.keySet()) {
+                InetAddress addr;
+
+                try {
+                    addr = InetAddress.getByName(host);
+                }
+                catch (UnknownHostException e) {
+                    throw new IgniteCheckedException("Invalid host name: " + host, e);
+                }
+
+                Collection<? extends ClusterNode> neighbors = null;
+
+                if (addr.isLoopbackAddress())
+                    neighbors = neighbors();
+                else {
+                    for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) {
+                        ClusterNode node = F.first(p);
+
+                        if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) {
+                            neighbors = p;
+
+                            break;
+                        }
+                    }
+                }
+
+                int startIdx = 1;
+
+                if (neighbors != null) {
+                    if (restart && !neighbors.isEmpty()) {
+                        try {
+                            ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false);
+                        }
+                        catch (ClusterGroupEmptyException ignored) {
+                            // No-op, nothing to restart.
+                        }
+                    }
+                    else
+                        startIdx = neighbors.size() + 1;
+                }
+
+                ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>();
+
+                runMap.put(host, nodeRuns);
+
+                for (IgniteRemoteStartSpecification spec : specsMap.get(host)) {
+                    assert spec.host().equals(host);
+
+                    for (int i = startIdx; i <= spec.nodes(); i++) {
+                        nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout));
+
+                        nodeCallCnt++;
+                    }
+                }
+            }
+
+            // If there is nothing to start, return finished future with empty result.
+            if (nodeCallCnt == 0)
+                return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>(
+                    ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList());
+
+            // Exceeding max line width for readability.
+            GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+                fut = new GridCompoundFuture<>(
+                ctx,
+                CU.<GridTuple3<String, Boolean, String>>objectsReducer()
+            );
+
+            AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
+
+            // Limit maximum simultaneous connection number per host.
+            for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) {
+                for (int i = 0; i < maxConn; i++) {
+                    if (!runNextNodeCallable(queue, fut, cnt))
+                        break;
+                }
+            }
+
+            return fut;
+        }
+        catch (IgniteCheckedException e) {
+            return new GridFinishedFuture<>(ctx, e);
+        }
+        finally {
+            unguard();
+        }
+    }
+
+    /**
+     * Gets the all grid nodes that reside on the same physical computer as local grid node.
+     * Local grid node is excluded.
+     * <p>
+     * Detection of the same physical computer is based on comparing set of network interface MACs.
+     * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
+     * physical computer.
+     * @return Grid nodes that reside on the same physical computer as local grid node.
+     */
+    private Collection<ClusterNode> neighbors() {
+        Collection<ClusterNode> neighbors = new ArrayList<>(1);
+
+        String macs = localNode().attribute(ATTR_MACS);
+
+        assert macs != null;
+
+        for (ClusterNode n : forOthers(localNode()).nodes()) {
+            if (macs.equals(n.attribute(ATTR_MACS)))
+                neighbors.add(n);
+        }
+
+        return neighbors;
+    }
+
+    /**
+     * Runs next callable from host node start queue.
+     *
+     * @param queue Queue of tasks to poll from.
+     * @param comp Compound future that comprise all started node tasks.
+     * @param cnt Atomic counter to check if all futures are added to compound future.
+     * @return {@code True} if task was started, {@code false} if queue was empty.
+     */
+    private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue,
+        final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
+        comp,
+        final AtomicInteger cnt)
+    {
+        IgniteNodeCallable call = queue.poll();
+
+        if (call == null)
+            return false;
+
+        IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true);
+
+        comp.add(fut);
+
+        if (cnt.decrementAndGet() == 0)
+            comp.markInitialized();
+
+        fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() {
+            @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) {
+                runNextNodeCallable(queue, comp, cnt);
+            }
+        });
+
+        return true;
+    }
+
+    /**
+     * Clears node local map.
+     */
+    public void clearNodeMap() {
+        nodeLoc.clear();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+        ctx = (GridKernalContext)in.readObject();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void writeExternal(ObjectOutput out) throws IOException {
+        out.writeObject(ctx);
+    }
+
+    /** {@inheritDoc} */
+    @Override protected Object readResolve() throws ObjectStreamException {
+        return ctx.grid().cluster();
+    }
+
+    /** {@inheritDoc} */
+    @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void start() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void stop(boolean cancel) throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStart() throws IgniteCheckedException {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onKernalStop(boolean cancel) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Override public void printMemoryStats() {
+        // No-op.
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
+        return null;
+    }
+
+    /** {@inheritDoc} */
+    public String toString() {
+        return "IgniteCluster [igniteName=" + ctx.gridName() + ']';
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
index 960bacd..43a6435 100644
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
+++ b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterAsyncImpl.java
@@ -38,7 +38,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
     private static final long serialVersionUID = 0L;
 
     /** */
-    private IgniteClusterImpl cluster;
+    private ClusterProcessor cluster;
 
     /**
      * Required by {@link Externalizable}.
@@ -50,7 +50,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
     /**
      * @param cluster Cluster.
      */
-    public IgniteClusterAsyncImpl(IgniteClusterImpl cluster) {
+    public IgniteClusterAsyncImpl(ClusterProcessor cluster) {
         super(true);
 
         this.cluster = cluster;
@@ -274,7 +274,7 @@ public class IgniteClusterAsyncImpl extends AsyncSupportAdapter<IgniteCluster>
 
     /** {@inheritDoc} */
     @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        cluster = (IgniteClusterImpl)in.readObject();
+        cluster = (ClusterProcessor)in.readObject();
     }
 
     /** {@inheritDoc} */

http://git-wip-us.apache.org/repos/asf/incubator-ignite/blob/8d50c599/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
----------------------------------------------------------------------
diff --git a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java b/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
deleted file mode 100644
index 45462a4..0000000
--- a/modules/core/src/main/java/org/apache/ignite/internal/cluster/IgniteClusterImpl.java
+++ /dev/null
@@ -1,579 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.cluster;
-
-import org.apache.ignite.*;
-import org.apache.ignite.cluster.*;
-import org.apache.ignite.configuration.*;
-import org.apache.ignite.internal.*;
-import org.apache.ignite.internal.processors.*;
-import org.apache.ignite.internal.util.future.*;
-import org.apache.ignite.internal.util.lang.*;
-import org.apache.ignite.internal.util.nodestart.*;
-import org.apache.ignite.internal.util.tostring.*;
-import org.apache.ignite.internal.util.typedef.*;
-import org.apache.ignite.internal.util.typedef.internal.*;
-import org.apache.ignite.lang.*;
-import org.apache.ignite.spi.*;
-import org.jetbrains.annotations.*;
-
-import java.io.*;
-import java.net.*;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.*;
-
-import static org.apache.ignite.internal.IgniteNodeAttributes.*;
-import static org.apache.ignite.internal.util.nodestart.IgniteNodeStartUtils.*;
-
-/**
- *
- */
-public class IgniteClusterImpl extends ClusterGroupAdapter implements IgniteClusterEx, GridProcessor {
-    /** */
-    private static final long serialVersionUID = 0L;
-
-    /** */
-    private IgniteConfiguration cfg;
-
-    /** Node local store. */
-    @GridToStringExclude
-    private ConcurrentMap nodeLoc;
-
-    /**
-     * Required by {@link Externalizable}.
-     */
-    public IgniteClusterImpl() {
-        // No-op.
-    }
-
-    /**
-     * @param ctx Kernal context.
-     */
-    public IgniteClusterImpl(GridKernalContext ctx) {
-        super(ctx, null, (IgnitePredicate<ClusterNode>)null);
-
-        cfg = ctx.config();
-
-        nodeLoc = new ClusterNodeLocalMapImpl(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterGroup forLocal() {
-        guard();
-
-        try {
-            return new ClusterGroupAdapter(ctx, null, Collections.singleton(cfg.getNodeId()));
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public ClusterNode localNode() {
-        guard();
-
-        try {
-            ClusterNode node = ctx.discovery().localNode();
-
-            assert node != null;
-
-            return node;
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @SuppressWarnings("unchecked")
-    @Override public <K, V> ConcurrentMap<K, V> nodeLocalMap() {
-        guard();
-
-        try {
-            return nodeLoc;
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean pingNode(UUID nodeId) {
-        A.notNull(nodeId, "nodeId");
-
-        guard();
-
-        try {
-            return ctx.discovery().pingNode(nodeId);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public long topologyVersion() {
-        guard();
-
-        try {
-            return ctx.discovery().topologyVersion();
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<ClusterNode> topology(long topVer) throws UnsupportedOperationException {
-        guard();
-
-        try {
-            return ctx.discovery().topology(topVer);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K> Map<ClusterNode, Collection<K>> mapKeysToNodes(@Nullable String cacheName,
-        @Nullable Collection<? extends K> keys)
-        throws IgniteException
-    {
-        if (F.isEmpty(keys))
-            return Collections.emptyMap();
-
-        guard();
-
-        try {
-            return ctx.affinity().mapKeysToNodes(cacheName, keys);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public <K> ClusterNode mapKeyToNode(@Nullable String cacheName, K key) throws IgniteException {
-        A.notNull(key, "key");
-
-        guard();
-
-        try {
-            return ctx.affinity().mapKeyToNode(cacheName, key);
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(File file,
-        boolean restart,
-        int timeout,
-        int maxConn)
-        throws IgniteException
-    {
-        try {
-            return startNodesAsync(file, restart, timeout, maxConn).get();
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public Collection<GridTuple3<String, Boolean, String>> startNodes(Collection<Map<String, Object>> hosts,
-        @Nullable Map<String, Object> dflts,
-        boolean restart,
-        int timeout,
-        int maxConn)
-        throws IgniteException
-    {
-        try {
-            return startNodesAsync(hosts, dflts, restart, timeout, maxConn).get();
-        }
-        catch (IgniteCheckedException e) {
-            throw U.convertException(e);
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stopNodes() throws IgniteException {
-        guard();
-
-        try {
-            compute().execute(IgniteKillTask.class, false);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stopNodes(Collection<UUID> ids) throws IgniteException {
-        guard();
-
-        try {
-            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, false);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void restartNodes() throws IgniteException {
-        guard();
-
-        try {
-            compute().execute(IgniteKillTask.class, true);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void restartNodes(Collection<UUID> ids) throws IgniteException {
-        guard();
-
-        try {
-            ctx.grid().compute(forNodeIds(ids)).execute(IgniteKillTask.class, true);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public void resetMetrics() {
-        guard();
-
-        try {
-            ctx.jobMetric().reset();
-            ctx.io().resetMetrics();
-            ctx.task().resetMetrics();
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /** {@inheritDoc} */
-    @Override public IgniteCluster withAsync() {
-        return new IgniteClusterAsyncImpl(this);
-    }
-
-    /** {@inheritDoc} */
-    @Override public boolean isAsync() {
-        return false;
-    }
-
-    /** {@inheritDoc} */
-    @Override public <R> IgniteFuture<R> future() {
-        throw new IllegalStateException("Asynchronous mode is not enabled.");
-    }
-
-    /**
-     * @param file Configuration file.
-     * @param restart Whether to stop existing nodes.
-     * @param timeout Connection timeout.
-     * @param maxConn Number of parallel SSH connections to one host.
-     * @return Future with results.
-     * @see {@link IgniteCluster#startNodes(java.io.File, boolean, int, int)}.
-     */
-    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(File file,
-      boolean restart,
-      int timeout,
-      int maxConn)
-    {
-        A.notNull(file, "file");
-        A.ensure(file.exists(), "file doesn't exist.");
-        A.ensure(file.isFile(), "file is a directory.");
-
-        try {
-            IgniteBiTuple<Collection<Map<String, Object>>, Map<String, Object>> t = parseFile(file);
-
-            return startNodesAsync(t.get1(), t.get2(), restart, timeout, maxConn);
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(ctx, e);
-        }
-    }
-
-    /**
-     * @param hosts Startup parameters.
-     * @param dflts Default values.
-     * @param restart Whether to stop existing nodes
-     * @param timeout Connection timeout in milliseconds.
-     * @param maxConn Number of parallel SSH connections to one host.
-     * @return Future with results.
-     * @see {@link IgniteCluster#startNodes(java.util.Collection, java.util.Map, boolean, int, int)}.
-     */
-    IgniteInternalFuture<Collection<GridTuple3<String, Boolean, String>>> startNodesAsync(
-        Collection<Map<String, Object>> hosts,
-        @Nullable Map<String, Object> dflts,
-        boolean restart,
-        int timeout,
-        int maxConn)
-    {
-        A.notNull(hosts, "hosts");
-
-        guard();
-
-        try {
-            IgniteSshProcessor sshProcessor = IgniteComponentType.SSH.create(false);
-
-            Map<String, Collection<IgniteRemoteStartSpecification>> specsMap = specifications(hosts, dflts);
-
-            Map<String, ConcurrentLinkedQueue<IgniteNodeCallable>> runMap = new HashMap<>();
-
-            int nodeCallCnt = 0;
-
-            for (String host : specsMap.keySet()) {
-                InetAddress addr;
-
-                try {
-                    addr = InetAddress.getByName(host);
-                }
-                catch (UnknownHostException e) {
-                    throw new IgniteCheckedException("Invalid host name: " + host, e);
-                }
-
-                Collection<? extends ClusterNode> neighbors = null;
-
-                if (addr.isLoopbackAddress())
-                    neighbors = neighbors();
-                else {
-                    for (Collection<ClusterNode> p : U.neighborhood(nodes()).values()) {
-                        ClusterNode node = F.first(p);
-
-                        if (node.<String>attribute(ATTR_IPS).contains(addr.getHostAddress())) {
-                            neighbors = p;
-
-                            break;
-                        }
-                    }
-                }
-
-                int startIdx = 1;
-
-                if (neighbors != null) {
-                    if (restart && !neighbors.isEmpty()) {
-                        try {
-                            ctx.grid().compute(forNodes(neighbors)).execute(IgniteKillTask.class, false);
-                        }
-                        catch (ClusterGroupEmptyException ignored) {
-                            // No-op, nothing to restart.
-                        }
-                    }
-                    else
-                        startIdx = neighbors.size() + 1;
-                }
-
-                ConcurrentLinkedQueue<IgniteNodeCallable> nodeRuns = new ConcurrentLinkedQueue<>();
-
-                runMap.put(host, nodeRuns);
-
-                for (IgniteRemoteStartSpecification spec : specsMap.get(host)) {
-                    assert spec.host().equals(host);
-
-                    for (int i = startIdx; i <= spec.nodes(); i++) {
-                        nodeRuns.add(sshProcessor.nodeStartCallable(spec, timeout));
-
-                        nodeCallCnt++;
-                    }
-                }
-            }
-
-            // If there is nothing to start, return finished future with empty result.
-            if (nodeCallCnt == 0)
-                return new GridFinishedFuture<Collection<GridTuple3<String, Boolean, String>>>(
-                    ctx, Collections.<GridTuple3<String, Boolean, String>>emptyList());
-
-            // Exceeding max line width for readability.
-            GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
-                fut = new GridCompoundFuture<>(
-                ctx,
-                CU.<GridTuple3<String, Boolean, String>>objectsReducer()
-            );
-
-            AtomicInteger cnt = new AtomicInteger(nodeCallCnt);
-
-            // Limit maximum simultaneous connection number per host.
-            for (ConcurrentLinkedQueue<IgniteNodeCallable> queue : runMap.values()) {
-                for (int i = 0; i < maxConn; i++) {
-                    if (!runNextNodeCallable(queue, fut, cnt))
-                        break;
-                }
-            }
-
-            return fut;
-        }
-        catch (IgniteCheckedException e) {
-            return new GridFinishedFuture<>(ctx, e);
-        }
-        finally {
-            unguard();
-        }
-    }
-
-    /**
-     * Gets the all grid nodes that reside on the same physical computer as local grid node.
-     * Local grid node is excluded.
-     * <p>
-     * Detection of the same physical computer is based on comparing set of network interface MACs.
-     * If two nodes have the same set of MACs, Ignite considers these nodes running on the same
-     * physical computer.
-     * @return Grid nodes that reside on the same physical computer as local grid node.
-     */
-    private Collection<ClusterNode> neighbors() {
-        Collection<ClusterNode> neighbors = new ArrayList<>(1);
-
-        String macs = localNode().attribute(ATTR_MACS);
-
-        assert macs != null;
-
-        for (ClusterNode n : forOthers(localNode()).nodes()) {
-            if (macs.equals(n.attribute(ATTR_MACS)))
-                neighbors.add(n);
-        }
-
-        return neighbors;
-    }
-
-    /**
-     * Runs next callable from host node start queue.
-     *
-     * @param queue Queue of tasks to poll from.
-     * @param comp Compound future that comprise all started node tasks.
-     * @param cnt Atomic counter to check if all futures are added to compound future.
-     * @return {@code True} if task was started, {@code false} if queue was empty.
-     */
-    private boolean runNextNodeCallable(final ConcurrentLinkedQueue<IgniteNodeCallable> queue,
-        final GridCompoundFuture<GridTuple3<String, Boolean, String>, Collection<GridTuple3<String, Boolean, String>>>
-        comp,
-        final AtomicInteger cnt)
-    {
-        IgniteNodeCallable call = queue.poll();
-
-        if (call == null)
-            return false;
-
-        IgniteInternalFuture<GridTuple3<String, Boolean, String>> fut = ctx.closure().callLocalSafe(call, true);
-
-        comp.add(fut);
-
-        if (cnt.decrementAndGet() == 0)
-            comp.markInitialized();
-
-        fut.listenAsync(new CI1<IgniteInternalFuture<GridTuple3<String, Boolean, String>>>() {
-            @Override public void apply(IgniteInternalFuture<GridTuple3<String, Boolean, String>> f) {
-                runNextNodeCallable(queue, comp, cnt);
-            }
-        });
-
-        return true;
-    }
-
-    /**
-     * Clears node local map.
-     */
-    public void clearNodeMap() {
-        nodeLoc.clear();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
-        ctx = (GridKernalContext)in.readObject();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void writeExternal(ObjectOutput out) throws IOException {
-        out.writeObject(ctx);
-    }
-
-    /** {@inheritDoc} */
-    @Override protected Object readResolve() throws ObjectStreamException {
-        return ctx.grid().cluster();
-    }
-
-    /** {@inheritDoc} */
-    @Override public void addAttributes(Map<String, Object> attrs) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void start() throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void stop(boolean cancel) throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStart() throws IgniteCheckedException {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onKernalStop(boolean cancel) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public Object collectDiscoveryData(UUID nodeId) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Override public void onDiscoveryDataReceived(UUID nodeId, Object data) {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Override public void printMemoryStats() {
-        // No-op.
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public IgniteSpiNodeValidationResult validateNode(ClusterNode node) {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    @Nullable @Override public DiscoveryDataExchangeType discoveryDataType() {
-        return null;
-    }
-
-    /** {@inheritDoc} */
-    public String toString() {
-        return "IgniteCluster [igniteName=" + ctx.gridName() + ']';
-    }
-}