You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/12/29 08:07:31 UTC
ignite git commit: IGNITE-4459: Hadoop: weighted planned is default
one from now on. This closes #1391.
Repository: ignite
Updated Branches:
refs/heads/ignite-2.0 2e691d80e -> a9b1fc2b3
IGNITE-4459: Hadoop: weighted planned is default one from now on. This closes #1391.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/a9b1fc2b
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/a9b1fc2b
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/a9b1fc2b
Branch: refs/heads/ignite-2.0
Commit: a9b1fc2b3840d47d7c978d9296e8ae6bdeb10be5
Parents: 2e691d8
Author: tledkov-gridgain <tl...@gridgain.com>
Authored: Thu Dec 29 11:07:22 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Thu Dec 29 11:07:22 2016 +0300
----------------------------------------------------------------------
.../mapreduce/IgniteHadoopMapReducePlanner.java | 414 -------------
.../processors/hadoop/HadoopProcessor.java | 4 +-
.../HadoopDefaultMapReducePlannerSelfTest.java | 619 -------------------
.../testsuites/IgniteHadoopTestSuite.java | 2 -
4 files changed, 2 insertions(+), 1037 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java b/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
deleted file mode 100644
index ac42381..0000000
--- a/modules/hadoop/src/main/java/org/apache/ignite/hadoop/mapreduce/IgniteHadoopMapReducePlanner.java
+++ /dev/null
@@ -1,414 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.hadoop.mapreduce;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteException;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.IgniteEx;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopJob;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.igfs.HadoopIgfsEndpoint;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopDefaultMapReducePlan;
-import org.apache.ignite.internal.processors.igfs.IgfsEx;
-import org.apache.ignite.internal.util.typedef.F;
-import org.jetbrains.annotations.NotNull;
-import org.jetbrains.annotations.Nullable;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Map;
-import java.util.UUID;
-
-import static org.apache.ignite.IgniteFileSystem.IGFS_SCHEME;
-
-/**
- * Default map-reduce planner implementation.
- */
-public class IgniteHadoopMapReducePlanner extends HadoopAbstractMapReducePlanner {
- /** {@inheritDoc} */
- @Override public HadoopMapReducePlan preparePlan(HadoopJob job, Collection<ClusterNode> top,
- @Nullable HadoopMapReducePlan oldPlan) throws IgniteCheckedException {
- // Convert collection of topology nodes to collection of topology node IDs.
- Collection<UUID> topIds = new HashSet<>(top.size(), 1.0f);
-
- for (ClusterNode topNode : top)
- topIds.add(topNode.id());
-
- Map<UUID, Collection<HadoopInputSplit>> mappers = mappers(top, topIds, job.input());
-
- int rdcCnt = job.info().reducers();
-
- if (rdcCnt < 0)
- throw new IgniteCheckedException("Number of reducers must be non-negative, actual: " + rdcCnt);
-
- Map<UUID, int[]> reducers = reducers(top, mappers, rdcCnt);
-
- return new HadoopDefaultMapReducePlan(mappers, reducers);
- }
-
- /**
- * Create plan for mappers.
- *
- * @param top Topology nodes.
- * @param topIds Topology node IDs.
- * @param splits Splits.
- * @return Mappers map.
- * @throws IgniteCheckedException If failed.
- */
- private Map<UUID, Collection<HadoopInputSplit>> mappers(Collection<ClusterNode> top, Collection<UUID> topIds,
- Iterable<HadoopInputSplit> splits) throws IgniteCheckedException {
- Map<UUID, Collection<HadoopInputSplit>> mappers = new HashMap<>();
-
- Map<String, Collection<UUID>> nodes = groupByHost(top);
-
- Map<UUID, Integer> nodeLoads = new HashMap<>(top.size(), 1.0f); // Track node load.
-
- for (UUID nodeId : topIds)
- nodeLoads.put(nodeId, 0);
-
- for (HadoopInputSplit split : splits) {
- UUID nodeId = nodeForSplit(split, topIds, nodes, nodeLoads);
-
- if (log.isDebugEnabled())
- log.debug("Mapped split to node [split=" + split + ", nodeId=" + nodeId + ']');
-
- Collection<HadoopInputSplit> nodeSplits = mappers.get(nodeId);
-
- if (nodeSplits == null) {
- nodeSplits = new ArrayList<>();
-
- mappers.put(nodeId, nodeSplits);
- }
-
- nodeSplits.add(split);
-
- // Updated node load.
- nodeLoads.put(nodeId, nodeLoads.get(nodeId) + 1);
- }
-
- return mappers;
- }
-
- /**
- * Determine the best node for this split.
- *
- * @param split Split.
- * @param topIds Topology node IDs.
- * @param nodes Nodes.
- * @param nodeLoads Node load tracker.
- * @return Node ID.
- * @throws IgniteCheckedException On error.
- */
- @SuppressWarnings("unchecked")
- private UUID nodeForSplit(HadoopInputSplit split, Collection<UUID> topIds, Map<String, Collection<UUID>> nodes,
- Map<UUID, Integer> nodeLoads) throws IgniteCheckedException {
- if (split instanceof HadoopFileBlock) {
- HadoopFileBlock split0 = (HadoopFileBlock)split;
-
- if (IGFS_SCHEME.equalsIgnoreCase(split0.file().getScheme())) {
- HadoopIgfsEndpoint endpoint = new HadoopIgfsEndpoint(split0.file().getAuthority());
-
- IgfsEx igfs = (IgfsEx)((IgniteEx)ignite).igfsx(endpoint.igfs());
-
- if (igfs != null && !igfs.isProxy(split0.file())) {
- IgfsPath path = new IgfsPath(split0.file());
-
- if (igfs.exists(path)) {
- Collection<IgfsBlockLocation> blocks;
-
- try {
- blocks = igfs.affinity(path, split0.start(), split0.length());
- }
- catch (IgniteException e) {
- throw new IgniteCheckedException(e);
- }
-
- assert blocks != null;
-
- if (blocks.size() == 1)
- // Fast-path, split consists of one IGFS block (as in most cases).
- return bestNode(blocks.iterator().next().nodeIds(), topIds, nodeLoads, false);
- else {
- // Slow-path, file consists of multiple IGFS blocks. First, find the most co-located nodes.
- Map<UUID, Long> nodeMap = new HashMap<>();
-
- List<UUID> bestNodeIds = null;
- long bestLen = -1L;
-
- for (IgfsBlockLocation block : blocks) {
- for (UUID blockNodeId : block.nodeIds()) {
- if (topIds.contains(blockNodeId)) {
- Long oldLen = nodeMap.get(blockNodeId);
- long newLen = oldLen == null ? block.length() : oldLen + block.length();
-
- nodeMap.put(blockNodeId, newLen);
-
- if (bestNodeIds == null || bestLen < newLen) {
- bestNodeIds = new ArrayList<>(1);
-
- bestNodeIds.add(blockNodeId);
-
- bestLen = newLen;
- }
- else if (bestLen == newLen) {
- assert !F.isEmpty(bestNodeIds);
-
- bestNodeIds.add(blockNodeId);
- }
- }
- }
- }
-
- if (bestNodeIds != null) {
- return bestNodeIds.size() == 1 ? bestNodeIds.get(0) :
- bestNode(bestNodeIds, topIds, nodeLoads, true);
- }
- }
- }
- }
- }
- }
-
- // Cannot use local IGFS for some reason, try selecting the node by host.
- Collection<UUID> blockNodes = null;
-
- for (String host : split.hosts()) {
- Collection<UUID> hostNodes = nodes.get(host);
-
- if (!F.isEmpty(hostNodes)) {
- if (blockNodes == null)
- blockNodes = new ArrayList<>(hostNodes);
- else
- blockNodes.addAll(hostNodes);
- }
- }
-
- return bestNode(blockNodes, topIds, nodeLoads, false);
- }
-
- /**
- * Finds the best (the least loaded) node among the candidates.
- *
- * @param candidates Candidates.
- * @param topIds Topology node IDs.
- * @param nodeLoads Known node loads.
- * @param skipTopCheck Whether to skip topology check.
- * @return The best node.
- */
- private UUID bestNode(@Nullable Collection<UUID> candidates, Collection<UUID> topIds, Map<UUID, Integer> nodeLoads,
- boolean skipTopCheck) {
- UUID bestNode = null;
- int bestLoad = Integer.MAX_VALUE;
-
- if (candidates != null) {
- for (UUID candidate : candidates) {
- if (skipTopCheck || topIds.contains(candidate)) {
- int load = nodeLoads.get(candidate);
-
- if (bestNode == null || bestLoad > load) {
- bestNode = candidate;
- bestLoad = load;
-
- if (bestLoad == 0)
- break; // Minimum load possible, no need for further iterations.
- }
- }
- }
- }
-
- if (bestNode == null) {
- // Blocks are located on nodes which are not Hadoop-enabled, assign to the least loaded one.
- bestLoad = Integer.MAX_VALUE;
-
- for (UUID nodeId : topIds) {
- int load = nodeLoads.get(nodeId);
-
- if (bestNode == null || bestLoad > load) {
- bestNode = nodeId;
- bestLoad = load;
-
- if (bestLoad == 0)
- break; // Minimum load possible, no need for further iterations.
- }
- }
- }
-
- assert bestNode != null;
-
- return bestNode;
- }
-
- /**
- * Create plan for reducers.
- *
- * @param top Topology.
- * @param mappers Mappers map.
- * @param reducerCnt Reducers count.
- * @return Reducers map.
- */
- private Map<UUID, int[]> reducers(Collection<ClusterNode> top,
- Map<UUID, Collection<HadoopInputSplit>> mappers, int reducerCnt) {
- // Determine initial node weights.
- int totalWeight = 0;
-
- List<WeightedNode> nodes = new ArrayList<>(top.size());
-
- for (ClusterNode node : top) {
- Collection<HadoopInputSplit> split = mappers.get(node.id());
-
- int weight = reducerNodeWeight(node, split != null ? split.size() : 0);
-
- nodes.add(new WeightedNode(node.id(), weight, weight));
-
- totalWeight += weight;
- }
-
- // Adjust weights.
- int totalAdjustedWeight = 0;
-
- for (WeightedNode node : nodes) {
- node.floatWeight = ((float)node.weight * reducerCnt) / totalWeight;
-
- node.weight = Math.round(node.floatWeight);
-
- totalAdjustedWeight += node.weight;
- }
-
- // Apply redundant/lost reducers.
- Collections.sort(nodes);
-
- if (totalAdjustedWeight > reducerCnt) {
- // Too much reducers set.
- ListIterator<WeightedNode> iter = nodes.listIterator(nodes.size() - 1);
-
- while (totalAdjustedWeight != reducerCnt) {
- if (!iter.hasPrevious())
- iter = nodes.listIterator(nodes.size() - 1);
-
- WeightedNode node = iter.previous();
-
- if (node.weight > 0) {
- node.weight -= 1;
-
- totalAdjustedWeight--;
- }
- }
- }
- else if (totalAdjustedWeight < reducerCnt) {
- // Not enough reducers set.
- ListIterator<WeightedNode> iter = nodes.listIterator(0);
-
- while (totalAdjustedWeight != reducerCnt) {
- if (!iter.hasNext())
- iter = nodes.listIterator(0);
-
- WeightedNode node = iter.next();
-
- if (node.floatWeight > 0.0f) {
- node.weight += 1;
-
- totalAdjustedWeight++;
- }
- }
- }
-
- int idx = 0;
-
- Map<UUID, int[]> reducers = new HashMap<>(nodes.size(), 1.0f);
-
- for (WeightedNode node : nodes) {
- if (node.weight > 0) {
- int[] arr = new int[node.weight];
-
- for (int i = 0; i < arr.length; i++)
- arr[i] = idx++;
-
- reducers.put(node.nodeId, arr);
- }
- }
-
- return reducers;
- }
-
- /**
- * Calculate node weight based on node metrics and data co-location.
- *
- * @param node Node.
- * @param splitCnt Splits mapped to this node.
- * @return Node weight.
- */
- @SuppressWarnings("UnusedParameters")
- protected int reducerNodeWeight(ClusterNode node, int splitCnt) {
- return splitCnt;
- }
-
- /**
- * Weighted node.
- */
- private static class WeightedNode implements Comparable<WeightedNode> {
- /** Node ID. */
- private final UUID nodeId;
-
- /** Weight. */
- private int weight;
-
- /** Floating point weight. */
- private float floatWeight;
-
- /**
- * Constructor.
- *
- * @param nodeId Node ID.
- * @param weight Weight.
- * @param floatWeight Floating point weight.
- */
- private WeightedNode(UUID nodeId, int weight, float floatWeight) {
- this.nodeId = nodeId;
- this.weight = weight;
- this.floatWeight = floatWeight;
- }
-
- /** {@inheritDoc} */
- @Override public boolean equals(Object obj) {
- return obj != null && obj instanceof WeightedNode && F.eq(nodeId, ((WeightedNode)obj).nodeId);
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- return nodeId.hashCode();
- }
-
- /** {@inheritDoc} */
- @Override public int compareTo(@NotNull WeightedNode other) {
- float res = other.floatWeight - floatWeight;
-
- return res > 0.0f ? 1 : res < 0.0f ? -1 : nodeId.compareTo(other.nodeId);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
index f0df1e9..329d67f 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/HadoopProcessor.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.processors.hadoop;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.configuration.HadoopConfiguration;
-import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.hadoop.counter.HadoopCounters;
@@ -243,7 +243,7 @@ public class HadoopProcessor extends HadoopProcessorAdapter {
*/
private void initializeDefaults(HadoopConfiguration cfg) {
if (cfg.getMapReducePlanner() == null)
- cfg.setMapReducePlanner(new IgniteHadoopMapReducePlanner());
+ cfg.setMapReducePlanner(new IgniteHadoopWeightedMapReducePlanner());
}
/** {@inheritDoc} */
http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java b/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
deleted file mode 100644
index ee1c88f..0000000
--- a/modules/hadoop/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopDefaultMapReducePlannerSelfTest.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.processors.hadoop.impl;
-
-import org.apache.ignite.IgniteCheckedException;
-import org.apache.ignite.IgniteFileSystem;
-import org.apache.ignite.cluster.ClusterNode;
-import org.apache.ignite.hadoop.mapreduce.IgniteHadoopMapReducePlanner;
-import org.apache.ignite.igfs.IgfsBlockLocation;
-import org.apache.ignite.igfs.IgfsPath;
-import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
-import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
-import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlanner;
-import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
-import org.apache.ignite.internal.processors.igfs.IgfsBlockLocationImpl;
-import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
-import org.apache.ignite.internal.processors.igfs.IgfsMock;
-import org.apache.ignite.internal.util.typedef.F;
-import org.apache.ignite.testframework.GridTestNode;
-import org.apache.ignite.testframework.GridTestUtils;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-
-/**
- *
- */
-public class HadoopDefaultMapReducePlannerSelfTest extends HadoopAbstractSelfTest {
- /** */
- private static final UUID ID_1 = new UUID(0, 1);
-
- /** */
- private static final UUID ID_2 = new UUID(0, 2);
-
- /** */
- private static final UUID ID_3 = new UUID(0, 3);
-
- /** */
- private static final String HOST_1 = "host1";
-
- /** */
- private static final String HOST_2 = "host2";
-
- /** */
- private static final String HOST_3 = "host3";
-
- /** */
- private static final String INVALID_HOST_1 = "invalid_host1";
-
- /** */
- private static final String INVALID_HOST_2 = "invalid_host2";
-
- /** */
- private static final String INVALID_HOST_3 = "invalid_host3";
-
- /** Mocked IGFS. */
- private static final IgniteFileSystem IGFS = new MockIgfs();
-
- /** Mocked Grid. */
- private static final IgfsIgniteMock GRID = new IgfsIgniteMock(null, IGFS);
-
- /** Planner. */
- private static final HadoopMapReducePlanner PLANNER = new IgniteHadoopMapReducePlanner();
-
- /** Block locations. */
- private static final Map<Block, Collection<IgfsBlockLocation>> BLOCK_MAP = new HashMap<>();
-
- /** Proxy map. */
- private static final Map<URI, Boolean> PROXY_MAP = new HashMap<>();
-
- /** Last created plan. */
- private static final ThreadLocal<HadoopMapReducePlan> PLAN = new ThreadLocal<>();
-
- /**
- * Static initializer.
- */
- static {
- GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "ignite", GRID);
- }
-
- /** {@inheritDoc} */
- @Override protected void beforeTest() throws Exception {
- GridTestUtils.setFieldValue(PLANNER, HadoopAbstractMapReducePlanner.class, "log", log());
-
- BLOCK_MAP.clear();
- PROXY_MAP.clear();
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsOneBlockPerNode() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1);
- HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_2);
- HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_2));
- mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 2);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsOneBlockPerNode() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1);
- HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_2);
- HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 2);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) || ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureMappers(ID_3, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2);
- HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2);
- HadoopFileBlock split3 = split(true, "/file3", 0, 100, HOST_1, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 100, ID_1, ID_2));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 100, ID_1, ID_2));
- mapIgfsBlock(split3.file(), 0, 100, location(0, 100, ID_1, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsSeveralBlocksPerNode() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(false, "/file1", 0, 100, HOST_1, HOST_2);
- HadoopFileBlock split2 = split(false, "/file2", 0, 100, HOST_1, HOST_2);
- HadoopFileBlock split3 = split(false, "/file3", 0, 100, HOST_1, HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) || ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
-
- plan(3, split1, split2, split3);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testIgfsSeveralComplexBlocksPerNode() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(true, "/file1", 0, 100, HOST_1, HOST_2, HOST_3);
- HadoopFileBlock split2 = split(true, "/file2", 0, 100, HOST_1, HOST_2, HOST_3);
-
- mapIgfsBlock(split1.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_1, ID_3));
- mapIgfsBlock(split2.file(), 0, 100, location(0, 50, ID_1, ID_2), location(51, 100, ID_2, ID_3));
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureEmpty(ID_2);
- assert ensureEmpty(ID_3);
-
- plan(1, split2);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_1);
- assert ensureEmpty(ID_3);
-
- plan(1, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) || ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0);
- assert ensureEmpty(ID_3);
-
- plan(2, split1, split2);
- assert ensureMappers(ID_1, split1);
- assert ensureMappers(ID_2, split2);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureEmpty(ID_3);
- }
-
- /**
- * @throws IgniteCheckedException If failed.
- */
- public void testNonIgfsOrphans() throws IgniteCheckedException {
- HadoopFileBlock split1 = split(false, "/file1", 0, 100, INVALID_HOST_1, INVALID_HOST_2);
- HadoopFileBlock split2 = split(false, "/file2", 0, 100, INVALID_HOST_1, INVALID_HOST_3);
- HadoopFileBlock split3 = split(false, "/file3", 0, 100, INVALID_HOST_2, INVALID_HOST_3);
-
- plan(1, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 1) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 1) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 1);
-
- plan(2, split1);
- assert ensureMappers(ID_1, split1) && ensureReducers(ID_1, 2) && ensureEmpty(ID_2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureMappers(ID_2, split1) && ensureReducers(ID_2, 2) && ensureEmpty(ID_3) ||
- ensureEmpty(ID_1) && ensureEmpty(ID_2) && ensureMappers(ID_3, split1) && ensureReducers(ID_3, 2);
-
- plan(1, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 0) ||
- ensureReducers(ID_1, 0) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 0) ||
- ensureReducers(ID_1, 0) && ensureReducers(ID_2, 0) && ensureReducers(ID_3, 1);
-
- plan(3, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1);
- assert ensureReducers(ID_2, 1);
- assert ensureReducers(ID_3, 1);
-
- plan(5, split1, split2, split3);
- assert ensureMappers(ID_1, split1) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split1) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split3) ||
- ensureMappers(ID_1, split2) && ensureMappers(ID_2, split3) && ensureMappers(ID_3, split1) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split1) && ensureMappers(ID_3, split2) ||
- ensureMappers(ID_1, split3) && ensureMappers(ID_2, split2) && ensureMappers(ID_3, split1);
- assert ensureReducers(ID_1, 1) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 1) && ensureReducers(ID_3, 2) ||
- ensureReducers(ID_1, 2) && ensureReducers(ID_2, 2) && ensureReducers(ID_3, 1);
- }
-
- /**
- * Create plan.
- *
- * @param reducers Reducers count.
- * @param splits Splits.
- * @return Plan.
- * @throws IgniteCheckedException If failed.
- */
- private static HadoopMapReducePlan plan(int reducers, HadoopInputSplit... splits) throws IgniteCheckedException {
- assert reducers > 0;
- assert splits != null && splits.length > 0;
-
- Collection<HadoopInputSplit> splitList = new ArrayList<>(splits.length);
-
- Collections.addAll(splitList, splits);
-
- Collection<ClusterNode> top = new ArrayList<>();
-
- GridTestNode node1 = new GridTestNode(ID_1);
- GridTestNode node2 = new GridTestNode(ID_2);
- GridTestNode node3 = new GridTestNode(ID_3);
-
- node1.setHostName(HOST_1);
- node2.setHostName(HOST_2);
- node3.setHostName(HOST_3);
-
- top.add(node1);
- top.add(node2);
- top.add(node3);
-
- HadoopMapReducePlan plan = PLANNER.preparePlan(new HadoopPlannerMockJob(splitList, reducers), top, null);
-
- PLAN.set(plan);
-
- return plan;
- }
-
- /**
- * Ensure that node contains the given mappers.
- *
- * @param nodeId Node ID.
- * @param expSplits Expected splits.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureMappers(UUID nodeId, HadoopInputSplit... expSplits) {
- Collection<HadoopInputSplit> expSplitsCol = new ArrayList<>();
-
- Collections.addAll(expSplitsCol, expSplits);
-
- Collection<HadoopInputSplit> splits = PLAN.get().mappers(nodeId);
-
- return F.eq(expSplitsCol, splits);
- }
-
- /**
- * Ensure that node contains the given amount of reducers.
- *
- * @param nodeId Node ID.
- * @param reducers Reducers.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureReducers(UUID nodeId, int reducers) {
- int[] reducersArr = PLAN.get().reducers(nodeId);
-
- return reducers == 0 ? F.isEmpty(reducersArr) : (reducersArr != null && reducersArr.length == reducers);
- }
-
- /**
- * Ensure that no mappers and reducers is located on this node.
- *
- * @param nodeId Node ID.
- * @return {@code True} if this assumption is valid.
- */
- private static boolean ensureEmpty(UUID nodeId) {
- return F.isEmpty(PLAN.get().mappers(nodeId)) && F.isEmpty(PLAN.get().reducers(nodeId));
- }
-
- /**
- * Create split.
- *
- * @param igfs IGFS flag.
- * @param file File.
- * @param start Start.
- * @param len Length.
- * @param hosts Hosts.
- * @return Split.
- */
- private static HadoopFileBlock split(boolean igfs, String file, long start, long len, String... hosts) {
- URI uri = URI.create((igfs ? "igfs://igfs@" : "hdfs://") + file);
-
- return new HadoopFileBlock(hosts, uri, start, len);
- }
-
- /**
- * Create block location.
- *
- * @param start Start.
- * @param len Length.
- * @param nodeIds Node IDs.
- * @return Block location.
- */
- private static IgfsBlockLocation location(long start, long len, UUID... nodeIds) {
- assert nodeIds != null && nodeIds.length > 0;
-
- Collection<ClusterNode> nodes = new ArrayList<>(nodeIds.length);
-
- for (UUID id : nodeIds)
- nodes.add(new GridTestNode(id));
-
- return new IgfsBlockLocationImpl(start, len, nodes);
- }
-
- /**
- * Map IGFS block to nodes.
- *
- * @param file File.
- * @param start Start.
- * @param len Length.
- * @param locations Locations.
- */
- private static void mapIgfsBlock(URI file, long start, long len, IgfsBlockLocation... locations) {
- assert locations != null && locations.length > 0;
-
- IgfsPath path = new IgfsPath(file);
-
- Block block = new Block(path, start, len);
-
- Collection<IgfsBlockLocation> locationsList = new ArrayList<>();
-
- Collections.addAll(locationsList, locations);
-
- BLOCK_MAP.put(block, locationsList);
- }
-
- /**
- * Block.
- */
- private static class Block {
- /** */
- private final IgfsPath path;
-
- /** */
- private final long start;
-
- /** */
- private final long len;
-
- /**
- * Constructor.
- *
- * @param path Path.
- * @param start Start.
- * @param len Length.
- */
- private Block(IgfsPath path, long start, long len) {
- this.path = path;
- this.start = start;
- this.len = len;
- }
-
- /** {@inheritDoc} */
- @SuppressWarnings("RedundantIfStatement")
- @Override public boolean equals(Object o) {
- if (this == o) return true;
- if (!(o instanceof Block)) return false;
-
- Block block = (Block) o;
-
- if (len != block.len)
- return false;
-
- if (start != block.start)
- return false;
-
- if (!path.equals(block.path))
- return false;
-
- return true;
- }
-
- /** {@inheritDoc} */
- @Override public int hashCode() {
- int res = path.hashCode();
-
- res = 31 * res + (int) (start ^ (start >>> 32));
- res = 31 * res + (int) (len ^ (len >>> 32));
-
- return res;
- }
- }
-
- /**
- * Mocked IGFS.
- */
- private static class MockIgfs extends IgfsMock {
- /**
- * Constructor.
- */
- public MockIgfs() {
- super("igfs");
- }
-
- /** {@inheritDoc} */
- @Override public boolean isProxy(URI path) {
- return PROXY_MAP.containsKey(path) && PROXY_MAP.get(path);
- }
-
- /** {@inheritDoc} */
- @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
- return BLOCK_MAP.get(new Block(path, start, len));
- }
-
- /** {@inheritDoc} */
- @Override public boolean exists(IgfsPath path) {
- return true;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/ignite/blob/a9b1fc2b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
index 01893fb..6c2d5c4 100644
--- a/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
+++ b/modules/hadoop/src/test/java/org/apache/ignite/testsuites/IgniteHadoopTestSuite.java
@@ -55,7 +55,6 @@ import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSy
import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemLoopbackExternalSecondarySelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.igfs.IgniteHadoopFileSystemSecondaryFileSystemInitializationSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopCommandLineTest;
-import org.apache.ignite.internal.processors.hadoop.impl.HadoopDefaultMapReducePlannerSelfTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopFileSystemsTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopGroupingTest;
import org.apache.ignite.internal.processors.hadoop.impl.HadoopJobTrackerSelfTest;
@@ -116,7 +115,6 @@ public class IgniteHadoopTestSuite extends TestSuite {
suite.addTest(new TestSuite(ldr.loadClass(HadoopUserLibsSelfTest.class.getName())));
- suite.addTest(new TestSuite(ldr.loadClass(HadoopDefaultMapReducePlannerSelfTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(HadoopWeightedMapReducePlannerTest.class.getName())));
suite.addTest(new TestSuite(ldr.loadClass(BasicUserNameMapperSelfTest.class.getName())));