You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/08/24 14:04:43 UTC
[20/50] [abbrv] ignite git commit: IGNITE-3414: Hadoop: implemented
new weight-based map-reduce planner.
IGNITE-3414: Hadoop: implemented new weight-based map-reduce planner.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/73649386
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/73649386
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/73649386
Branch: refs/heads/ignite-2649
Commit: 736493865c1e3a56f864a01583d38e50d02b2c56
Parents: 5f57cc8
Author: vozerov-gridgain <vo...@gridgain.com>
Authored: Tue Jul 19 15:16:21 2016 +0300
Committer: vozerov-gridgain <vo...@gridgain.com>
Committed: Tue Jul 19 15:16:21 2016 +0300
----------------------------------------------------------------------
.../processors/igfs/IgfsIgniteMock.java | 492 +++++++++++
.../internal/processors/igfs/IgfsMock.java | 397 +++++++++
.../mapreduce/IgniteHadoopMapReducePlanner.java | 48 +-
.../IgniteHadoopWeightedMapReducePlanner.java | 846 +++++++++++++++++++
.../internal/processors/hadoop/HadoopUtils.java | 81 ++
.../planner/HadoopAbstractMapReducePlanner.java | 116 +++
.../planner/HadoopMapReducePlanGroup.java | 150 ++++
.../planner/HadoopMapReducePlanTopology.java | 89 ++
.../HadoopDefaultMapReducePlannerSelfTest.java | 451 +---------
.../processors/hadoop/HadoopMapReduceTest.java | 16 +-
.../processors/hadoop/HadoopPlannerMockJob.java | 168 ++++
.../HadoopWeightedMapReducePlannerTest.java | 599 +++++++++++++
.../HadoopWeightedPlannerMapReduceTest.java | 38 +
.../testsuites/IgniteHadoopTestSuite.java | 8 +-
14 files changed, 3022 insertions(+), 477 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
new file mode 100644
index 0000000..0c55595
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsIgniteMock.java
@@ -0,0 +1,492 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteAtomicLong;
+import org.apache.ignite.IgniteAtomicReference;
+import org.apache.ignite.IgniteAtomicSequence;
+import org.apache.ignite.IgniteAtomicStamped;
+import org.apache.ignite.IgniteBinary;
+import org.apache.ignite.IgniteCache;
+import org.apache.ignite.IgniteCompute;
+import org.apache.ignite.IgniteCountDownLatch;
+import org.apache.ignite.IgniteDataStreamer;
+import org.apache.ignite.IgniteEvents;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.IgniteMessaging;
+import org.apache.ignite.IgniteQueue;
+import org.apache.ignite.IgniteScheduler;
+import org.apache.ignite.IgniteSemaphore;
+import org.apache.ignite.IgniteServices;
+import org.apache.ignite.IgniteSet;
+import org.apache.ignite.IgniteTransactions;
+import org.apache.ignite.cache.affinity.Affinity;
+import org.apache.ignite.cluster.ClusterGroup;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.configuration.CacheConfiguration;
+import org.apache.ignite.configuration.CollectionConfiguration;
+import org.apache.ignite.configuration.IgniteConfiguration;
+import org.apache.ignite.configuration.NearCacheConfiguration;
+import org.apache.ignite.internal.GridKernalContext;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.cluster.IgniteClusterEx;
+import org.apache.ignite.internal.processors.cache.GridCacheUtilityKey;
+import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
+import org.apache.ignite.internal.processors.hadoop.Hadoop;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgnitePredicate;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.plugin.IgnitePlugin;
+import org.apache.ignite.plugin.PluginNotFoundException;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Mocked Ignite implementation for IGFS tests.
+ */
+public class IgfsIgniteMock implements IgniteEx {
+ /** Name. */
+ private final String name;
+
+ /** IGFS. */
+ private final IgniteFileSystem igfs;
+
+ /**
+ * Constructor.
+ *
+ * @param igfs IGFS instance.
+ */
+ public IgfsIgniteMock(@Nullable String name, IgniteFileSystem igfs) {
+ this.name = name;
+ this.igfs = igfs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K extends GridCacheUtilityKey, V> IgniteInternalCache<K, V> utilityCache() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex(@Nullable String name) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public <K, V> IgniteInternalCache<K, V> cachex() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Override public Collection<IgniteInternalCache<?, ?>> cachesx(
+ @Nullable IgnitePredicate<? super IgniteInternalCache<?, ?>>... p) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean eventUserRecordable(int type) {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean allEventsUserRecordable(int[] types) {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isJmxRemoteEnabled() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isRestartEnabled() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgniteFileSystem igfsx(@Nullable String name) {
+ return F.eq(name, igfs.name()) ? igfs : null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Hadoop hadoop() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteClusterEx cluster() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String latestVersion() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterNode localNode() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public GridKernalContext context() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteLogger log() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteConfiguration configuration() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompute compute() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCompute compute(ClusterGroup grp) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteMessaging message() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteMessaging message(ClusterGroup grp) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteEvents events() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteEvents events(ClusterGroup grp) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteServices services() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteServices services(ClusterGroup grp) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService executorService() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ExecutorService executorService(ClusterGroup grp) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteScheduler scheduler() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> createCache(String cacheName) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> getOrCreateCache(String cacheName) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> void addCacheConfiguration(CacheConfiguration<K, V> cacheCfg) {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> createCache(CacheConfiguration<K, V> cacheCfg,
+ NearCacheConfiguration<K, V> nearCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> getOrCreateCache(CacheConfiguration<K, V> cacheCfg,
+ NearCacheConfiguration<K, V> nearCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> createNearCache(@Nullable String cacheName,
+ NearCacheConfiguration<K, V> nearCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> getOrCreateNearCache(@Nullable String cacheName,
+ NearCacheConfiguration<K, V> nearCfg) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void destroyCache(String cacheName) {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteCache<K, V> cache(@Nullable String name) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> cacheNames() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteTransactions transactions() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K, V> IgniteDataStreamer<K, V> dataStreamer(@Nullable String cacheName) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFileSystem fileSystem(String name) {
+ IgniteFileSystem res = igfsx(name);
+
+ if (res == null)
+ throw new IllegalArgumentException("IGFS is not configured: " + name);
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgniteFileSystem> fileSystems() {
+ return Collections.singleton(igfs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteAtomicSequence atomicSequence(String name, long initVal, boolean create)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteAtomicLong atomicLong(String name, long initVal, boolean create) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteAtomicReference<T> atomicReference(String name, @Nullable T initVal, boolean create)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, S> IgniteAtomicStamped<T, S> atomicStamped(String name, @Nullable T initVal,
+ @Nullable S initStamp, boolean create) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteCountDownLatch countDownLatch(String name, int cnt, boolean autoDel, boolean create)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteSemaphore semaphore(String name, int cnt, boolean failoverSafe, boolean create)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteQueue<T> queue(String name, int cap, @Nullable CollectionConfiguration cfg)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T> IgniteSet<T> set(String name, @Nullable CollectionConfiguration cfg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T extends IgnitePlugin> T plugin(String name) throws PluginNotFoundException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteBinary binary() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void close() throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <K> Affinity<K> affinity(String cacheName) {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /**
+ * Throw {@link UnsupportedOperationException}.
+ */
+ private static void throwUnsupported() {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
----------------------------------------------------------------------
diff --git a/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
new file mode 100644
index 0000000..dccab4a
--- /dev/null
+++ b/modules/core/src/test/java/org/apache/ignite/internal/processors/igfs/IgfsMock.java
@@ -0,0 +1,397 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.igfs;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.configuration.FileSystemConfiguration;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsFile;
+import org.apache.ignite.igfs.IgfsMetrics;
+import org.apache.ignite.igfs.IgfsOutputStream;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.igfs.IgfsPathSummary;
+import org.apache.ignite.igfs.mapreduce.IgfsRecordResolver;
+import org.apache.ignite.igfs.mapreduce.IgfsTask;
+import org.apache.ignite.igfs.secondary.IgfsSecondaryFileSystem;
+import org.apache.ignite.lang.IgniteFuture;
+import org.apache.ignite.lang.IgniteUuid;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Mocked IGFS implementation for IGFS tests.
+ */
+public class IgfsMock implements IgfsEx {
+ /** Name. */
+ private final String name;
+
+ /**
+ * Constructor.
+ *
+ * @param name Name.
+ */
+ public IgfsMock(@Nullable String name) {
+ this.name = name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void stop(boolean cancel) {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsContext context() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsPaths proxyPaths() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize, int seqReadsBeforePrefetch) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsInputStreamAdapter open(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsInputStreamAdapter open(IgfsPath path, int bufSize) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsStatus globalSpace() throws IgniteCheckedException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void globalSampling(@Nullable Boolean val) throws IgniteCheckedException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public Boolean globalSampling() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsLocalMetrics localMetrics() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long groupBlockSize() {
+ throwUnsupported();
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public String clientLogDirectory() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void clientLogDirectory(String logDir) {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean evictExclude(IgfsPath path, boolean primary) {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteUuid nextAffinityKey() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isProxy(URI path) {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsSecondaryFileSystem asSecondary() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String name() {
+ return name;
+ }
+
+ /** {@inheritDoc} */
+ @Override public FileSystemConfiguration configuration() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsPathSummary summary(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsOutputStream create(IgfsPath path, boolean overwrite) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, int replication,
+ long blockSize, @Nullable Map<String, String> props) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsOutputStream create(IgfsPath path, int bufSize, boolean overwrite, @Nullable IgniteUuid affKey,
+ int replication, long blockSize, @Nullable Map<String, String> props) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsOutputStream append(IgfsPath path, boolean create) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsOutputStream append(IgfsPath path, int bufSize, boolean create,
+ @Nullable Map<String, String> props) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void setTimes(IgfsPath path, long accessTime, long modificationTime) throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len, long maxLen)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsMetrics metrics() throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void resetMetrics() throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public long size(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void format() throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(IgfsTask<T, R> task, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, @Nullable T arg) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <T, R> R execute(Class<? extends IgfsTask<T, R>> taskCls, @Nullable IgfsRecordResolver rslvr,
+ Collection<IgfsPath> paths, boolean skipNonExistentFiles, long maxRangeLen, @Nullable T arg)
+ throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgfsFile update(IgfsPath path, Map<String, String> props) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void rename(IgfsPath src, IgfsPath dest) throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean delete(IgfsPath path, boolean recursive) throws IgniteException {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public void mkdirs(IgfsPath path, @Nullable Map<String, String> props) throws IgniteException {
+ throwUnsupported();
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsPath> listPaths(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsFile> listFiles(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Nullable @Override public IgfsFile info(IgfsPath path) throws IgniteException {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long usedSpaceSize() throws IgniteException {
+ throwUnsupported();
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteFileSystem withAsync() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isAsync() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public <R> IgniteFuture<R> future() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /**
+ * Throw {@link UnsupportedOperationException}.
+ */
+ private static void throwUnsupported() {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
index 287b5ec..d4a44fa 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
@@ -26,10 +26,9 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.UUID;
-import org.apache.ignite.Ignite;
+
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
-import org.apache.ignite.IgniteLogger;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.igfs.IgfsBlockLocation;
import org.apache.ignite.igfs.IgfsPath;
@@ -38,14 +37,11 @@ import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
import org.apache.ignite.internal.processors.hadoop.HadoopJob;
import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
import org.apache.ignite.internal.processors.igfs.IgfsEx;
import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.internal.util.typedef.internal.U;
-import org.apache.ignite.resources.IgniteInstanceResource;
-import org.apache.ignite.resources.LoggerResource;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -54,16 +50,7 @@ import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
/**
* Default map-reduce planner implementation.
*/
-public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
- /** Injected grid. */
- @IgniteInstanceResource
- private Ignite ignite;
-
- /** Logger. */
- @SuppressWarnings("UnusedDeclaration")
- @LoggerResource
- private IgniteLogger log;
-
+public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner {
/** {@inheritDoc} */
@Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
@Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
@@ -98,7 +85,7 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
- Map<String, Collection<UUID>> nodes = hosts(top);
+ Map<String, Collection<UUID>> nodes = groupByHost(top);
Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
@@ -129,33 +116,6 @@ public class IgniteHadoopMapReducePlanner implements HadoopMapReducePlanner {
}
/**
- * Groups nodes by host names.
- *
- * @param top Topology to group.
- * @return Map.
- */
- private static Map<String, Collection<UUID>> hosts(Collection<ClusterNode> top) {
- Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
-
- for (ClusterNode node : top) {
- for (String host : node.hostNames()) {
- Collection<UUID> nodeIds = grouped.get(host);
-
- if (nodeIds == null) {
- // Expecting 1-2 nodes per host.
- nodeIds = new ArrayList<>(2);
-
- grouped.put(host, nodeIds);
- }
-
- nodeIds.add(node.id());
- }
- }
-
- return grouped;
- }
-
- /**
* Determine the best node for this split.
*
* @param split Split.
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
new file mode 100644
index 0000000..27ffc19
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopWeightedMapReducePlanner.java
@@ -0,0 +1,846 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.hadoop.mapreduce;
+
+import org.apache.ignite.IgniteCheckedException;
+import org.apache.ignite.IgniteException;
+import org.apache.ignite.IgniteFileSystem;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteEx;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopJob;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.HadoopUtils;
+import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanGroup;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopMapReducePlanTopology;
+import org.apache.ignite.internal.processors.igfs.IgfsEx;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.IdentityHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Map-reduce planner which assigns mappers and reducers based on their "weights". Weight describes how much resources
+ * are required to execute particular map or reduce task.
+ * <p>
+ * Plan creation consists of two steps: assigning mappers and assigning reducers.
+ * <p>
+ * Mappers are assigned based on input split data location. For each input split we search for nodes where
+ * its data is stored. Planner tries to assign mappers to their affinity nodes first. This process is governed by two
+ * properties:
+ * <ul>
+ * <li><b>{@code localMapperWeight}</b> - weight of a map task when it is executed on an affinity node;</li>
+ * <li><b>{@code remoteMapperWeight}</b> - weight of a map task when it is executed on a non-affinity node.</li>
+ * </ul>
+ * Planning algorithm assign mappers so that total resulting weight on all nodes is minimum possible.
+ * <p>
+ * Reducers are assigned differently. First we try to distribute reducers across nodes with mappers. This approach
+ * could minimize expensive data transfer over network. Reducer assigned to a node with mapper is considered
+ * <b>{@code local}</b>. Otherwise it is considered <b>{@code remote}</b>. This process continue until certain weight
+ * threshold is reached what means that current node is already too busy and it should not have higher priority over
+ * other nodes any more. Threshold can be configured using <b>{@code preferLocalReducerThresholdWeight}</b> property.
+ * <p>
+ * When local reducer threshold is reached on all nodes, we distribute remaining reducers based on their local and
+ * remote weights in the same way as it is done for mappers. This process is governed by two
+ * properties:
+ * <ul>
+ * <li><b>{@code localReducerWeight}</b> - weight of a reduce task when it is executed on a node with mappers;</li>
+ * <li><b>{@code remoteReducerWeight}</b> - weight of a map task when it is executed on a node without mappers.</li>
+ * </ul>
+ */
+public class IgniteHadoopWeightedMapReducePlanner extends HadoopAbstractMapReducePlanner {
+ /** Default local mapper weight. */
+ public static final int DFLT_LOC_MAPPER_WEIGHT = 100;
+
+ /** Default remote mapper weight. */
+ public static final int DFLT_RMT_MAPPER_WEIGHT = 100;
+
+ /** Default local reducer weight. */
+ public static final int DFLT_LOC_REDUCER_WEIGHT = 100;
+
+ /** Default remote reducer weight. */
+ public static final int DFLT_RMT_REDUCER_WEIGHT = 100;
+
+ /** Default reducer migration threshold weight. */
+ public static final int DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT = 200;
+
+ /** Local mapper weight. */
+ private int locMapperWeight = DFLT_LOC_MAPPER_WEIGHT;
+
+ /** Remote mapper weight. */
+ private int rmtMapperWeight = DFLT_RMT_MAPPER_WEIGHT;
+
+ /** Local reducer weight. */
+ private int locReducerWeight = DFLT_LOC_REDUCER_WEIGHT;
+
+ /** Remote reducer weight. */
+ private int rmtReducerWeight = DFLT_RMT_REDUCER_WEIGHT;
+
+ /** Reducer migration threshold weight. */
+ private int preferLocReducerThresholdWeight = DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT;
+
+ /** {@inheritDoc} */
+ @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> nodes,
+ @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
+ List<HadoopInputSplit> splits = HadoopUtils.sortInputSplits(job.input());
+ int reducerCnt = job.info().reducers();
+
+ if (reducerCnt < 0)
+ throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + reducerCnt);
+
+ HadoopMapReducePlanTopology top = topology(nodes);
+
+ Mappers mappers = assignMappers(splits, top);
+
+ Map<UUID, int[]> reducers = assignReducers(splits, top, mappers, reducerCnt);
+
+ return new HadoopDefaultMapReducePlan(mappers.nodeToSplits, reducers);
+ }
+
+ /**
+ * Assign mappers to nodes.
+ *
+ * @param splits Input splits.
+ * @param top Topology.
+ * @return Mappers.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Mappers assignMappers(Collection<HadoopInputSplit> splits,
+ HadoopMapReducePlanTopology top) throws IgniteCheckedException {
+ Mappers res = new Mappers();
+
+ for (HadoopInputSplit split : splits) {
+ // Try getting IGFS affinity.
+ Collection<UUID> nodeIds = affinityNodesForSplit(split, top);
+
+ // Get best node.
+ UUID node = bestMapperNode(nodeIds, top);
+
+ assert node != null;
+
+ res.add(split, node);
+ }
+
+ return res;
+ }
+
+ /**
+ * Get affinity nodes for the given input split.
+ * <p>
+ * Order in the returned collection *is* significant, meaning that nodes containing more data
+ * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling.
+ *
+ * @param split Split.
+ * @param top Topology.
+ * @return Affintiy nodes.
+ * @throws IgniteCheckedException If failed.
+ */
+ private Collection<UUID> affinityNodesForSplit(HadoopInputSplit split, HadoopMapReducePlanTopology top)
+ throws IgniteCheckedException {
+ Collection<UUID> igfsNodeIds = igfsAffinityNodesForSplit(split);
+
+ if (igfsNodeIds != null)
+ return igfsNodeIds;
+
+ Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+ for (String host : split.hosts()) {
+ long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0L;
+
+ HadoopMapReducePlanGroup grp = top.groupForHost(host);
+
+ if (grp != null) {
+ for (int i = 0; i < grp.nodeCount(); i++) {
+ UUID nodeId = grp.nodeId(i);
+
+ res.put(new NodeIdAndLength(nodeId, len), nodeId);
+ }
+ }
+ }
+
+ return new LinkedHashSet<>(res.values());
+ }
+
+ /**
+ * Get IGFS affinity nodes for split if possible.
+ * <p>
+ * Order in the returned collection *is* significant, meaning that nodes containing more data
+ * go first. This way, the 1st nodes in the collection considered to be preferable for scheduling.
+ *
+ * @param split Input split.
+ * @return IGFS affinity or {@code null} if IGFS is not available.
+ * @throws IgniteCheckedException If failed.
+ */
+ @Nullable private Collection<UUID> igfsAffinityNodesForSplit(HadoopInputSplit split) throws IgniteCheckedException {
+ if (split instanceof HadoopFileBlock) {
+ HadoopFileBlock split0 = (HadoopFileBlock)split;
+
+ if (IgniteFileSystem.IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
+ HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
+
+ IgfsEx igfs = null;
+
+ if (F.eq(ignite.name(), endpoint.grid()))
+ igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
+
+ if (igfs != null && !igfs.isProxy(split0.file())) {
+ IgfsPath path = new IgfsPath(split0.file());
+
+ if (igfs.exists(path)) {
+ Collection<IgfsBlockLocation> blocks;
+
+ try {
+ blocks = igfs.affinity(path, split0.start(), split0.length());
+ }
+ catch (IgniteException e) {
+ throw new IgniteCheckedException("Failed to get IGFS file block affinity [path=" + path +
+ ", start=" + split0.start() + ", len=" + split0.length() + ']', e);
+ }
+
+ assert blocks != null;
+
+ if (blocks.size() == 1)
+ return blocks.iterator().next().nodeIds();
+ else {
+ // The most "local" nodes go first.
+ Map<UUID, Long> idToLen = new HashMap<>();
+
+ for (IgfsBlockLocation block : blocks) {
+ for (UUID id : block.nodeIds()) {
+ Long len = idToLen.get(id);
+
+ idToLen.put(id, len == null ? block.length() : block.length() + len);
+ }
+ }
+
+ // Sort the nodes in non-ascending order by contained data lengths.
+ Map<NodeIdAndLength, UUID> res = new TreeMap<>();
+
+ for (Map.Entry<UUID, Long> idToLenEntry : idToLen.entrySet()) {
+ UUID id = idToLenEntry.getKey();
+
+ res.put(new NodeIdAndLength(id, idToLenEntry.getValue()), id);
+ }
+
+ return new LinkedHashSet<>(res.values());
+ }
+ }
+ }
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Find best mapper node.
+ *
+ * @param affIds Affinity node IDs.
+ * @param top Topology.
+ * @return Result.
+ */
+ private UUID bestMapperNode(@Nullable Collection<UUID> affIds, HadoopMapReducePlanTopology top) {
+ // Priority node.
+ UUID prioAffId = F.first(affIds);
+
+ // Find group with the least weight.
+ HadoopMapReducePlanGroup resGrp = null;
+ MapperPriority resPrio = MapperPriority.NORMAL;
+ int resWeight = Integer.MAX_VALUE;
+
+ for (HadoopMapReducePlanGroup grp : top.groups()) {
+ MapperPriority prio = groupPriority(grp, affIds, prioAffId);
+
+ int weight = grp.weight() + (prio == MapperPriority.NORMAL ? rmtMapperWeight : locMapperWeight);
+
+ if (resGrp == null || weight < resWeight || weight == resWeight && prio.value() > resPrio.value()) {
+ resGrp = grp;
+ resPrio = prio;
+ resWeight = weight;
+ }
+ }
+
+ assert resGrp != null;
+
+ // Update group weight for further runs.
+ resGrp.weight(resWeight);
+
+ // Return the best node from the group.
+ return bestMapperNodeForGroup(resGrp, resPrio, affIds, prioAffId);
+ }
+
+ /**
+ * Get best node in the group.
+ *
+ * @param grp Group.
+ * @param priority Priority.
+ * @param affIds Affinity IDs.
+ * @param prioAffId Priority affinity IDs.
+ * @return Best node ID in the group.
+ */
+ private static UUID bestMapperNodeForGroup(HadoopMapReducePlanGroup grp, MapperPriority priority,
+ @Nullable Collection<UUID> affIds, @Nullable UUID prioAffId) {
+ // Return the best node from the group.
+ int idx = 0;
+
+ // This is rare situation when several nodes are started on the same host.
+ if (!grp.single()) {
+ switch (priority) {
+ case NORMAL: {
+ // Pick any node.
+ idx = ThreadLocalRandom.current().nextInt(grp.nodeCount());
+
+ break;
+ }
+ case HIGH: {
+ // Pick any affinity node.
+ assert affIds != null;
+
+ List<Integer> cands = new ArrayList<>();
+
+ for (int i = 0; i < grp.nodeCount(); i++) {
+ UUID id = grp.nodeId(i);
+
+ if (affIds.contains(id))
+ cands.add(i);
+ }
+
+ idx = cands.get(ThreadLocalRandom.current().nextInt(cands.size()));
+
+ break;
+ }
+ default: {
+ // Find primary node.
+ assert prioAffId != null;
+
+ for (int i = 0; i < grp.nodeCount(); i++) {
+ UUID id = grp.nodeId(i);
+
+ if (F.eq(id, prioAffId)) {
+ idx = i;
+
+ break;
+ }
+ }
+
+ assert priority == MapperPriority.HIGHEST;
+ }
+ }
+ }
+
+ return grp.nodeId(idx);
+ }
+
+ /**
+ * Generate reducers.
+ *
+ * @param splits Input splits.
+ * @param top Topology.
+ * @param mappers Mappers.
+ * @param reducerCnt Reducer count.
+ * @return Reducers.
+ */
+ private Map<UUID, int[]> assignReducers(Collection<HadoopInputSplit> splits, HadoopMapReducePlanTopology top,
+ Mappers mappers, int reducerCnt) {
+ Map<UUID, Integer> reducers = assignReducers0(top, splits, mappers, reducerCnt);
+
+ int cnt = 0;
+
+ Map<UUID, int[]> res = new HashMap<>(reducers.size());
+
+ for (Map.Entry<UUID, Integer> reducerEntry : reducers.entrySet()) {
+ int[] arr = new int[reducerEntry.getValue()];
+
+ for (int i = 0; i < arr.length; i++)
+ arr[i] = cnt++;
+
+ res.put(reducerEntry.getKey(), arr);
+ }
+
+ assert reducerCnt == cnt : reducerCnt + " != " + cnt;
+
+ return res;
+ }
+
+ /**
+ * Generate reducers.
+ *
+ * @param top Topology.
+ * @param splits Input splits.
+ * @param mappers Mappers.
+ * @param reducerCnt Reducer count.
+ * @return Reducers.
+ */
+ private Map<UUID, Integer> assignReducers0(HadoopMapReducePlanTopology top, Collection<HadoopInputSplit> splits,
+ Mappers mappers, int reducerCnt) {
+ Map<UUID, Integer> res = new HashMap<>();
+
+ // Assign reducers to splits.
+ Map<HadoopInputSplit, Integer> splitToReducerCnt = assignReducersToSplits(splits, reducerCnt);
+
+ // Assign as much local reducers as possible.
+ int remaining = 0;
+
+ for (Map.Entry<HadoopInputSplit, Integer> entry : splitToReducerCnt.entrySet()) {
+ HadoopInputSplit split = entry.getKey();
+ int cnt = entry.getValue();
+
+ if (cnt > 0) {
+ int assigned = assignLocalReducers(split, cnt, top, mappers, res);
+
+ assert assigned <= cnt;
+
+ remaining += cnt - assigned;
+ }
+ }
+
+ // Assign the rest reducers.
+ if (remaining > 0)
+ assignRemoteReducers(remaining, top, mappers, res);
+
+ return res;
+ }
+
+ /**
+ * Assign local split reducers.
+ *
+ * @param split Split.
+ * @param cnt Reducer count.
+ * @param top Topology.
+ * @param mappers Mappers.
+ * @param resMap Reducers result map.
+ * @return Number of locally assigned reducers.
+ */
+ private int assignLocalReducers(HadoopInputSplit split, int cnt, HadoopMapReducePlanTopology top, Mappers mappers,
+ Map<UUID, Integer> resMap) {
+ // Dereference node.
+ UUID nodeId = mappers.splitToNode.get(split);
+
+ assert nodeId != null;
+
+ // Dereference group.
+ HadoopMapReducePlanGroup grp = top.groupForId(nodeId);
+
+ assert grp != null;
+
+ // Assign more reducers to the node until threshold is reached.
+ int res = 0;
+
+ while (res < cnt && grp.weight() < preferLocReducerThresholdWeight) {
+ res++;
+
+ grp.weight(grp.weight() + locReducerWeight);
+ }
+
+ // Update result map.
+ if (res > 0) {
+ Integer reducerCnt = resMap.get(nodeId);
+
+ resMap.put(nodeId, reducerCnt == null ? res : reducerCnt + res);
+ }
+
+ return res;
+ }
+
+ /**
+ * Assign remote reducers. Assign to the least loaded first.
+ *
+ * @param cnt Count.
+ * @param top Topology.
+ * @param mappers Mappers.
+ * @param resMap Reducers result map.
+ */
+ private void assignRemoteReducers(int cnt, HadoopMapReducePlanTopology top, Mappers mappers,
+ Map<UUID, Integer> resMap) {
+
+ TreeSet<HadoopMapReducePlanGroup> set = new TreeSet<>(new GroupWeightComparator());
+
+ set.addAll(top.groups());
+
+ while (cnt-- > 0) {
+ // The least loaded machine.
+ HadoopMapReducePlanGroup grp = set.first();
+
+ // Look for nodes with assigned splits.
+ List<UUID> splitNodeIds = null;
+
+ for (int i = 0; i < grp.nodeCount(); i++) {
+ UUID nodeId = grp.nodeId(i);
+
+ if (mappers.nodeToSplits.containsKey(nodeId)) {
+ if (splitNodeIds == null)
+ splitNodeIds = new ArrayList<>(2);
+
+ splitNodeIds.add(nodeId);
+ }
+ }
+
+ // Select best node.
+ UUID id;
+ int newWeight;
+
+ if (splitNodeIds != null) {
+ id = splitNodeIds.get(ThreadLocalRandom.current().nextInt(splitNodeIds.size()));
+
+ newWeight = grp.weight() + locReducerWeight;
+ }
+ else {
+ id = grp.nodeId(ThreadLocalRandom.current().nextInt(grp.nodeCount()));
+
+ newWeight = grp.weight() + rmtReducerWeight;
+ }
+
+ // Re-add entry with new weight.
+ boolean rmv = set.remove(grp);
+
+ assert rmv;
+
+ grp.weight(newWeight);
+
+ boolean add = set.add(grp);
+
+ assert add;
+
+ // Update result map.
+ Integer res = resMap.get(id);
+
+ resMap.put(id, res == null ? 1 : res + 1);
+ }
+ }
+
+ /**
+ * Comparator based on group's weight.
+ */
+ private static class GroupWeightComparator implements Comparator<HadoopMapReducePlanGroup> {
+ /** {@inheritDoc} */
+ @Override public int compare(HadoopMapReducePlanGroup first, HadoopMapReducePlanGroup second) {
+ int res = first.weight() - second.weight();
+
+ if (res < 0)
+ return -1;
+ else if (res > 0)
+ return 1;
+ else
+ return first.macs().compareTo(second.macs());
+ }
+ }
+
+ /**
+ * Distribute reducers between splits.
+ *
+ * @param splits Splits.
+ * @param reducerCnt Reducer count.
+ * @return Map from input split to reducer count.
+ */
+ private Map<HadoopInputSplit, Integer> assignReducersToSplits(Collection<HadoopInputSplit> splits,
+ int reducerCnt) {
+ Map<HadoopInputSplit, Integer> res = new IdentityHashMap<>(splits.size());
+
+ int base = reducerCnt / splits.size();
+ int remainder = reducerCnt % splits.size();
+
+ for (HadoopInputSplit split : splits) {
+ int val = base;
+
+ if (remainder > 0) {
+ val++;
+
+ remainder--;
+ }
+
+ res.put(split, val);
+ }
+
+ assert remainder == 0;
+
+ return res;
+ }
+
+ /**
+ * Calculate group priority.
+ *
+ * @param grp Group.
+ * @param affIds Affinity IDs.
+ * @param prioAffId Priority affinity ID.
+ * @return Group priority.
+ */
+ private static MapperPriority groupPriority(HadoopMapReducePlanGroup grp, @Nullable Collection<UUID> affIds,
+ @Nullable UUID prioAffId) {
+ assert F.isEmpty(affIds) ? prioAffId == null : prioAffId == F.first(affIds);
+ assert grp != null;
+
+ MapperPriority prio = MapperPriority.NORMAL;
+
+ if (!F.isEmpty(affIds)) {
+ for (int i = 0; i < grp.nodeCount(); i++) {
+ UUID id = grp.nodeId(i);
+
+ if (affIds.contains(id)) {
+ prio = MapperPriority.HIGH;
+
+ if (F.eq(prioAffId, id)) {
+ prio = MapperPriority.HIGHEST;
+
+ break;
+ }
+ }
+ }
+ }
+
+ return prio;
+ }
+
+ /**
+ * Get local mapper weight. This weight is added to a node when a mapper is assigned and it's input split data is
+ * located on this node (at least partially).
+ * <p>
+ * Defaults to {@link #DFLT_LOC_MAPPER_WEIGHT}.
+ *
+ * @return Remote mapper weight.
+ */
+ public int getLocalMapperWeight() {
+ return locMapperWeight;
+ }
+
+ /**
+ * Set local mapper weight. See {@link #getLocalMapperWeight()} for more information.
+ *
+ * @param locMapperWeight Local mapper weight.
+ */
+ public void setLocalMapperWeight(int locMapperWeight) {
+ this.locMapperWeight = locMapperWeight;
+ }
+
+ /**
+ * Get remote mapper weight. This weight is added to a node when a mapper is assigned, but it's input
+ * split data is not located on this node.
+ * <p>
+ * Defaults to {@link #DFLT_RMT_MAPPER_WEIGHT}.
+ *
+ * @return Remote mapper weight.
+ */
+ public int getRemoteMapperWeight() {
+ return rmtMapperWeight;
+ }
+
+ /**
+ * Set remote mapper weight. See {@link #getRemoteMapperWeight()} for more information.
+ *
+ * @param rmtMapperWeight Remote mapper weight.
+ */
+ public void setRemoteMapperWeight(int rmtMapperWeight) {
+ this.rmtMapperWeight = rmtMapperWeight;
+ }
+
+ /**
+ * Get local reducer weight. This weight is added to a node when a reducer is assigned and the node have at least
+ * one assigned mapper.
+ * <p>
+ * Defaults to {@link #DFLT_LOC_REDUCER_WEIGHT}.
+ *
+ * @return Local reducer weight.
+ */
+ public int getLocalReducerWeight() {
+ return locReducerWeight;
+ }
+
+ /**
+ * Set local reducer weight. See {@link #getLocalReducerWeight()} for more information.
+ *
+ * @param locReducerWeight Local reducer weight.
+ */
+ public void setLocalReducerWeight(int locReducerWeight) {
+ this.locReducerWeight = locReducerWeight;
+ }
+
+ /**
+ * Get remote reducer weight. This weight is added to a node when a reducer is assigned, but the node doesn't have
+ * any assigned mappers.
+ * <p>
+ * Defaults to {@link #DFLT_RMT_REDUCER_WEIGHT}.
+ *
+ * @return Remote reducer weight.
+ */
+ public int getRemoteReducerWeight() {
+ return rmtReducerWeight;
+ }
+
+ /**
+ * Set remote reducer weight. See {@link #getRemoteReducerWeight()} for more information.
+ *
+ * @param rmtReducerWeight Remote reducer weight.
+ */
+ public void setRemoteReducerWeight(int rmtReducerWeight) {
+ this.rmtReducerWeight = rmtReducerWeight;
+ }
+
+ /**
+ * Get reducer migration threshold weight. When threshold is reached, a node with mappers is no longer considered
+ * as preferred for further reducer assignments.
+ * <p>
+ * Defaults to {@link #DFLT_PREFER_LOCAL_REDUCER_THRESHOLD_WEIGHT}.
+ *
+ * @return Reducer migration threshold weight.
+ */
+ public int getPreferLocalReducerThresholdWeight() {
+ return preferLocReducerThresholdWeight;
+ }
+
+ /**
+ * Set reducer migration threshold weight. See {@link #getPreferLocalReducerThresholdWeight()} for more
+ * information.
+ *
+ * @param reducerMigrationThresholdWeight Reducer migration threshold weight.
+ */
+ public void setPreferLocalReducerThresholdWeight(int reducerMigrationThresholdWeight) {
+ this.preferLocReducerThresholdWeight = reducerMigrationThresholdWeight;
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(IgniteHadoopWeightedMapReducePlanner.class, this);
+ }
+
+ /**
+ * Node ID and length.
+ */
+ private static class NodeIdAndLength implements Comparable<NodeIdAndLength> {
+ /** Node ID. */
+ private final UUID id;
+
+ /** Length. */
+ private final long len;
+
+ /**
+ * Constructor.
+ *
+ * @param id Node ID.
+ * @param len Length.
+ */
+ public NodeIdAndLength(UUID id, long len) {
+ this.id = id;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public int compareTo(NodeIdAndLength obj) {
+ long res = len - obj.len;
+
+ if (res > 0)
+ return -1;
+ else if (res < 0)
+ return 1;
+ else
+ return id.compareTo(obj.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof NodeIdAndLength && F.eq(id, ((NodeIdAndLength)obj).id);
+ }
+ }
+
+ /**
+ * Mappers.
+ */
+ private static class Mappers {
+ /** Node-to-splits map. */
+ private final Map<UUID, Collection<HadoopInputSplit>> nodeToSplits = new HashMap<>();
+
+ /** Split-to-node map. */
+ private final Map<HadoopInputSplit, UUID> splitToNode = new IdentityHashMap<>();
+
+ /**
+ * Add mapping.
+ *
+ * @param split Split.
+ * @param node Node.
+ */
+ public void add(HadoopInputSplit split, UUID node) {
+ Collection<HadoopInputSplit> nodeSplits = nodeToSplits.get(node);
+
+ if (nodeSplits == null) {
+ nodeSplits = new HashSet<>();
+
+ nodeToSplits.put(node, nodeSplits);
+ }
+
+ nodeSplits.add(split);
+
+ splitToNode.put(split, node);
+ }
+ }
+
+ /**
+ * Mapper priority enumeration.
+ */
+ private enum MapperPriority {
+ /** Normal node. */
+ NORMAL(0),
+
+ /** (likely) Affinity node. */
+ HIGH(1),
+
+ /** (likely) Affinity node with the highest priority (e.g. because it hosts more data than other nodes). */
+ HIGHEST(2);
+
+ /** Value. */
+ private final int val;
+
+ /**
+ * Constructor.
+ *
+ * @param val Value.
+ */
+ MapperPriority(int val) {
+ this.val = val;
+ }
+
+ /**
+ * @return Value.
+ */
+ public int value() {
+ return val;
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
index 3fa963f..44d871a 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopUtils.java
@@ -25,8 +25,12 @@ import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -352,4 +356,81 @@ public class HadoopUtils {
}
}
+ /**
+ * Sort input splits by length.
+ *
+ * @param splits Splits.
+ * @return Sorted splits.
+ */
+ public static List<HadoopInputSplit> sortInputSplits(Collection<HadoopInputSplit> splits) {
+ int id = 0;
+
+ TreeSet<SplitSortWrapper> sortedSplits = new TreeSet<>();
+
+ for (HadoopInputSplit split : splits) {
+ long len = split instanceof HadoopFileBlock ? ((HadoopFileBlock)split).length() : 0;
+
+ sortedSplits.add(new SplitSortWrapper(id++, split, len));
+ }
+
+ ArrayList<HadoopInputSplit> res = new ArrayList<>(sortedSplits.size());
+
+ for (SplitSortWrapper sortedSplit : sortedSplits)
+ res.add(sortedSplit.split);
+
+ return res;
+ }
+
+ /**
+ * Split wrapper for sorting.
+ */
+ private static class SplitSortWrapper implements Comparable<SplitSortWrapper> {
+ /** Unique ID. */
+ private final int id;
+
+ /** Split. */
+ private final HadoopInputSplit split;
+
+ /** Split length. */
+ private final long len;
+
+ /**
+ * Constructor.
+ *
+ * @param id Unique ID.
+ * @param split Split.
+ * @param len Split length.
+ */
+ public SplitSortWrapper(int id, HadoopInputSplit split, long len) {
+ this.id = id;
+ this.split = split;
+ this.len = len;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("NullableProblems")
+ @Override public int compareTo(SplitSortWrapper other) {
+ assert other != null;
+
+ long res = len - other.len;
+
+ if (res > 0)
+ return -1;
+ else if (res < 0)
+ return 1;
+ else
+ return id - other.id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof SplitSortWrapper && id == ((SplitSortWrapper)obj).id;
+ }
+ }
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
new file mode 100644
index 0000000..f01f72b
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopAbstractMapReducePlanner.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.Ignite;
+import org.apache.ignite.IgniteLogger;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
+import org.apache.ignite.internal.util.typedef.internal.U;
+import org.apache.ignite.resources.IgniteInstanceResource;
+import org.apache.ignite.resources.LoggerResource;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.ignite.internal.IgniteNodeAttributes.ATTR_MACS;
+
+/**
+ * Base class for map-reduce planners.
+ */
+public abstract class HadoopAbstractMapReducePlanner implements HadoopMapReducePlanner {
+ /** Injected grid. */
+ @IgniteInstanceResource
+ protected Ignite ignite;
+
+ /** Logger. */
+ @SuppressWarnings("UnusedDeclaration")
+ @LoggerResource
+ protected IgniteLogger log;
+
+ /**
+ * Create plan topology.
+ *
+ * @param nodes Topology nodes.
+ * @return Plan topology.
+ */
+ protected static HadoopMapReducePlanTopology topology(Collection<ClusterNode> nodes) {
+ Map<String, HadoopMapReducePlanGroup> macsMap = new HashMap<>(nodes.size());
+
+ Map<UUID, HadoopMapReducePlanGroup> idToGrp = new HashMap<>(nodes.size());
+ Map<String, HadoopMapReducePlanGroup> hostToGrp = new HashMap<>(nodes.size());
+
+ for (ClusterNode node : nodes) {
+ String macs = node.attribute(ATTR_MACS);
+
+ HadoopMapReducePlanGroup grp = macsMap.get(macs);
+
+ if (grp == null) {
+ grp = new HadoopMapReducePlanGroup(node, macs);
+
+ macsMap.put(macs, grp);
+ }
+ else
+ grp.add(node);
+
+ idToGrp.put(node.id(), grp);
+
+ for (String host : node.addresses()) {
+ HadoopMapReducePlanGroup hostGrp = hostToGrp.get(host);
+
+ if (hostGrp == null)
+ hostToGrp.put(host, grp);
+ else
+ assert hostGrp == grp;
+ }
+ }
+
+ return new HadoopMapReducePlanTopology(new ArrayList<>(macsMap.values()), idToGrp, hostToGrp);
+ }
+
+
+ /**
+ * Groups nodes by host names.
+ *
+ * @param top Topology to group.
+ * @return Map.
+ */
+ protected static Map<String, Collection<UUID>> groupByHost(Collection<ClusterNode> top) {
+ Map<String, Collection<UUID>> grouped = U.newHashMap(top.size());
+
+ for (ClusterNode node : top) {
+ for (String host : node.hostNames()) {
+ Collection<UUID> nodeIds = grouped.get(host);
+
+ if (nodeIds == null) {
+ // Expecting 1-2 nodes per host.
+ nodeIds = new ArrayList<>(2);
+
+ grouped.put(host, nodeIds);
+ }
+
+ nodeIds.add(node.id());
+ }
+ }
+
+ return grouped;
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
new file mode 100644
index 0000000..2fe8682
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanGroup.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.internal.util.typedef.internal.S;
+
+import java.util.ArrayList;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan group of nodes on a single physical machine.
+ */
+public class HadoopMapReducePlanGroup {
+ /** Node. */
+ private ClusterNode node;
+
+ /** Nodes. */
+ private ArrayList<ClusterNode> nodes;
+
+ /** MAC addresses. */
+ private final String macs;
+
+ /** Weight. */
+ private int weight;
+
+ /**
+ * Constructor.
+ *
+ * @param node First node in the group.
+ * @param macs MAC addresses.
+ */
+ public HadoopMapReducePlanGroup(ClusterNode node, String macs) {
+ assert node != null;
+ assert macs != null;
+
+ this.node = node;
+ this.macs = macs;
+ }
+
+ /**
+ * Add node to the group.
+ *
+ * @param newNode New node.
+ */
+ public void add(ClusterNode newNode) {
+ if (node != null) {
+ nodes = new ArrayList<>(2);
+
+ nodes.add(node);
+
+ node = null;
+ }
+
+ nodes.add(newNode);
+ }
+
+ /**
+ * @return MAC addresses.
+ */
+ public String macs() {
+ return macs;
+ }
+
+ /**
+ * @return {@code True} if only sinle node present.
+ */
+ public boolean single() {
+ return nodeCount() == 1;
+ }
+
+ /**
+ * Get node ID by index.
+ *
+ * @param idx Index.
+ * @return Node.
+ */
+ public UUID nodeId(int idx) {
+ ClusterNode res;
+
+ if (node != null) {
+ assert idx == 0;
+
+ res = node;
+ }
+ else {
+ assert nodes != null;
+ assert idx < nodes.size();
+
+ res = nodes.get(idx);
+ }
+
+ assert res != null;
+
+ return res.id();
+ }
+
+ /**
+ * @return Node count.
+ */
+ public int nodeCount() {
+ return node != null ? 1 : nodes.size();
+ }
+
+ /**
+ * @return weight.
+ */
+ public int weight() {
+ return weight;
+ }
+
+ /**
+ * @param weight weight.
+ */
+ public void weight(int weight) {
+ this.weight = weight;
+ }
+
+
+ /** {@inheritDoc} */
+ @Override public int hashCode() {
+ return macs.hashCode();
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean equals(Object obj) {
+ return obj instanceof HadoopMapReducePlanGroup && F.eq(macs, ((HadoopMapReducePlanGroup)obj).macs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopMapReducePlanGroup.class, this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/73649386/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
new file mode 100644
index 0000000..fa5c469
--- /dev/null
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/planner/HadoopMapReducePlanTopology.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.planner;
+
+import org.apache.ignite.internal.util.typedef.internal.S;
+import org.jetbrains.annotations.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+/**
+ * Map-reduce plan topology.
+ */
+public class HadoopMapReducePlanTopology {
+ /** All groups. */
+ private final List<HadoopMapReducePlanGroup> grps;
+
+ /** Node ID to group map. */
+ private final Map<UUID, HadoopMapReducePlanGroup> idToGrp;
+
+ /** Host to group map. */
+ private final Map<String, HadoopMapReducePlanGroup> hostToGrp;
+
+ /**
+ * Constructor.
+ *
+ * @param grps All groups.
+ * @param idToGrp ID to group map.
+ * @param hostToGrp Host to group map.
+ */
+ public HadoopMapReducePlanTopology(List<HadoopMapReducePlanGroup> grps,
+ Map<UUID, HadoopMapReducePlanGroup> idToGrp, Map<String, HadoopMapReducePlanGroup> hostToGrp) {
+ assert grps != null;
+ assert idToGrp != null;
+ assert hostToGrp != null;
+
+ this.grps = grps;
+ this.idToGrp = idToGrp;
+ this.hostToGrp = hostToGrp;
+ }
+
+ /**
+ * @return All groups.
+ */
+ public List<HadoopMapReducePlanGroup> groups() {
+ return grps;
+ }
+
+ /**
+ * Get group for node ID.
+ *
+ * @param id Node ID.
+ * @return Group.
+ */
+ public HadoopMapReducePlanGroup groupForId(UUID id) {
+ return idToGrp.get(id);
+ }
+
+ /**
+ * Get group for host.
+ *
+ * @param host Host.
+ * @return Group.
+ */
+ @Nullable public HadoopMapReducePlanGroup groupForHost(String host) {
+ return hostToGrp.get(host);
+ }
+
+ /** {@inheritDoc} */
+ @Override public String toString() {
+ return S.toString(HadoopMapReducePlanTopology.class, this);
+ }
+}