You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/09/22 07:00:55 UTC
[15/51] [partial] ignite git commit: IGNITE-3949: Applied new
HadoopClassLoader architecture.
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
new file mode 100644
index 0000000..430c675
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedMapReducePlannerTest.java
@@ -0,0 +1,602 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.cluster.ClusterMetrics;
+import org.apache.ignite.cluster.ClusterNode;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+import org.apache.ignite.igfs.IgfsBlockLocation;
+import org.apache.ignite.igfs.IgfsPath;
+import org.apache.ignite.internal.IgniteNodeAttributes;
+import org.apache.ignite.internal.processors.hadoop.HadoopFileBlock;
+import org.apache.ignite.internal.processors.hadoop.HadoopInputSplit;
+import org.apache.ignite.internal.processors.hadoop.HadoopMapReducePlan;
+import org.apache.ignite.internal.processors.hadoop.planner.HadoopAbstractMapReducePlanner;
+import org.apache.ignite.internal.processors.igfs.IgfsIgniteMock;
+import org.apache.ignite.internal.processors.igfs.IgfsMock;
+import org.apache.ignite.internal.util.typedef.F;
+import org.apache.ignite.lang.IgniteProductVersion;
+import org.apache.ignite.testframework.GridTestUtils;
+import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
+import org.jetbrains.annotations.Nullable;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+/**
+ * Tests for weighted map-reduce planned.
+ */
+public class HadoopWeightedMapReducePlannerTest extends GridCommonAbstractTest {
+ /** ID 1. */
+ private static final UUID ID_1 = new UUID(0, 1);
+
+ /** ID 2. */
+ private static final UUID ID_2 = new UUID(0, 2);
+
+ /** ID 3. */
+ private static final UUID ID_3 = new UUID(0, 3);
+
+ /** MAC 1. */
+ private static final String MAC_1 = "mac1";
+
+ /** MAC 2. */
+ private static final String MAC_2 = "mac2";
+
+ /** MAC 3. */
+ private static final String MAC_3 = "mac3";
+
+ /** Host 1. */
+ private static final String HOST_1 = "host1";
+
+ /** Host 2. */
+ private static final String HOST_2 = "host2";
+
+ /** Host 3. */
+ private static final String HOST_3 = "host3";
+
+ /** Host 4. */
+ private static final String HOST_4 = "host4";
+
+ /** Host 5. */
+ private static final String HOST_5 = "host5";
+
+ /** Standard node 1. */
+ private static final MockNode NODE_1 = new MockNode(ID_1, MAC_1, HOST_1);
+
+ /** Standard node 2. */
+ private static final MockNode NODE_2 = new MockNode(ID_2, MAC_2, HOST_2);
+
+ /** Standard node 3. */
+ private static final MockNode NODE_3 = new MockNode(ID_3, MAC_3, HOST_3);
+
+ /** Standard nodes. */
+ private static final Collection<ClusterNode> NODES;
+
+ /**
+ * Static initializer.
+ */
+ static {
+ NODES = new ArrayList<>();
+
+ NODES.add(NODE_1);
+ NODES.add(NODE_2);
+ NODES.add(NODE_3);
+ }
+
+ /**
+ * Test one IGFS split being assigned to affinity node.
+ *
+ * @throws Exception If failed.
+ */
+ public void testOneIgfsSplitAffinity() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("igfs://igfs@/file"), 0, 50));
+
+ final int expReducers = 4;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ assert plan.mappers() == 1;
+ assert plan.mapperNodeIds().size() == 1;
+ assert plan.mapperNodeIds().contains(ID_1);
+
+ checkPlanMappers(plan, splits, NODES, false/*only 1 split*/);
+ checkPlanReducers(plan, NODES, expReducers, false/* because of threshold behavior.*/);
+ }
+
+ /**
+ * Test one HDFS splits.
+ *
+ * @throws Exception If failed.
+ */
+ public void testHdfsSplitsAffinity() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ final List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+ splits.add(new HadoopFileBlock(new String[] { HOST_2 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+ splits.add(new HadoopFileBlock(new String[] { HOST_3 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+
+ // The following splits belong to hosts that are out of Ignite topology at all.
+ // This means that these splits should be assigned to any least loaded modes:
+ splits.add(new HadoopFileBlock(new String[] { HOST_4 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+ splits.add(new HadoopFileBlock(new String[] { HOST_5 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+ final int expReducers = 7;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ checkPlanMappers(plan, splits, NODES, true);
+
+ checkPlanReducers(plan, NODES, expReducers, true);
+ }
+
+ /**
+ * Test HDFS splits with Replication == 3.
+ *
+ * @throws Exception If failed.
+ */
+ public void testHdfsSplitsReplication() throws Exception {
+ IgfsMock igfs = LocationsBuilder.create().add(0, NODE_1).add(50, NODE_2).add(100, NODE_3).buildIgfs();
+
+ final List<HadoopInputSplit> splits = new ArrayList<>();
+
+ splits.add(new HadoopFileBlock(new String[] { HOST_1, HOST_2, HOST_3 }, URI.create("hfds://" + HOST_1 + "/x"), 0, 50));
+ splits.add(new HadoopFileBlock(new String[] { HOST_2, HOST_3, HOST_4 }, URI.create("hfds://" + HOST_2 + "/x"), 50, 100));
+ splits.add(new HadoopFileBlock(new String[] { HOST_3, HOST_4, HOST_5 }, URI.create("hfds://" + HOST_3 + "/x"), 100, 37));
+ // The following splits belong to hosts that are out of Ignite topology at all.
+ // This means that these splits should be assigned to any least loaded modes:
+ splits.add(new HadoopFileBlock(new String[] { HOST_4, HOST_5, HOST_1 }, URI.create("hfds://" + HOST_4 + "/x"), 138, 2));
+ splits.add(new HadoopFileBlock(new String[] { HOST_5, HOST_1, HOST_2 }, URI.create("hfds://" + HOST_5 + "/x"), 140, 3));
+
+ final int expReducers = 8;
+
+ HadoopPlannerMockJob job = new HadoopPlannerMockJob(splits, expReducers);
+
+ IgniteHadoopWeightedMapReducePlanner planner = createPlanner(igfs);
+
+ final HadoopMapReducePlan plan = planner.preparePlan(job, NODES, null);
+
+ checkPlanMappers(plan, splits, NODES, true);
+
+ checkPlanReducers(plan, NODES, expReducers, true);
+ }
+
+ /**
+ * Get all IDs.
+ *
+ * @param nodes Nodes.
+ * @return IDs.
+ */
+ private static Set<UUID> allIds(Collection<ClusterNode> nodes) {
+ Set<UUID> allIds = new HashSet<>();
+
+ for (ClusterNode n : nodes)
+ allIds.add(n.id());
+
+ return allIds;
+ }
+
+ /**
+ * Check mappers for the plan.
+ *
+ * @param plan Plan.
+ * @param splits Splits.
+ * @param nodes Nodes.
+ * @param expectUniformity WHether uniformity is expected.
+ */
+ private static void checkPlanMappers(HadoopMapReducePlan plan, List<HadoopInputSplit> splits,
+ Collection<ClusterNode> nodes, boolean expectUniformity) {
+ // Number of mappers should correspomd to the number of input splits:
+ assertEquals(splits.size(), plan.mappers());
+
+ if (expectUniformity) {
+ // mappers are assigned to all available nodes:
+ assertEquals(nodes.size(), plan.mapperNodeIds().size());
+
+
+ assertEquals(allIds(nodes), plan.mapperNodeIds());
+ }
+
+ // Check all splits are covered by mappers:
+ Set<HadoopInputSplit> set = new HashSet<>();
+
+ for (UUID id: plan.mapperNodeIds()) {
+ Collection<HadoopInputSplit> sp = plan.mappers(id);
+
+ assert sp != null;
+
+ for (HadoopInputSplit s: sp)
+ assertTrue(set.add(s));
+ }
+
+ // must be of the same size & contain same elements:
+ assertEquals(set, new HashSet<>(splits));
+ }
+
+ /**
+ * Check plan reducers.
+ *
+ * @param plan Plan.
+ * @param nodes Nodes.
+ * @param expReducers Expected reducers.
+ * @param expectUniformity Expected uniformity.
+ */
+ private static void checkPlanReducers(HadoopMapReducePlan plan,
+ Collection<ClusterNode> nodes, int expReducers, boolean expectUniformity) {
+
+ assertEquals(expReducers, plan.reducers());
+
+ if (expectUniformity)
+ assertEquals(allIds(nodes), plan.reducerNodeIds());
+
+ int sum = 0;
+ int lenSum = 0;
+
+ for (UUID uuid: plan.reducerNodeIds()) {
+ int[] rr = plan.reducers(uuid);
+
+ assert rr != null;
+
+ lenSum += rr.length;
+
+ for (int i: rr)
+ sum += i;
+ }
+
+ assertEquals(expReducers, lenSum);
+
+ // Numbers in the arrays must be consequtive integers stating from 0,
+ // check that simply calculating their total sum:
+ assertEquals((lenSum * (lenSum - 1) / 2), sum);
+ }
+
+ /**
+ * Create planner for IGFS.
+ *
+ * @param igfs IGFS.
+ * @return Planner.
+ */
+ private static IgniteHadoopWeightedMapReducePlanner createPlanner(IgfsMock igfs) {
+ IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+ IgfsIgniteMock ignite = new IgfsIgniteMock(null, igfs);
+
+ GridTestUtils.setFieldValue(planner, HadoopAbstractMapReducePlanner.class, "ignite", ignite);
+
+ return planner;
+ }
+
+ /**
+ * Throw {@link UnsupportedOperationException}.
+ */
+ private static void throwUnsupported() {
+ throw new UnsupportedOperationException("Should not be called!");
+ }
+
+ /**
+ * Mocked node.
+ */
+ private static class MockNode implements ClusterNode {
+ /** ID. */
+ private final UUID id;
+
+ /** MAC addresses. */
+ private final String macs;
+
+ /** Addresses. */
+ private final List<String> addrs;
+
+ /**
+ * Constructor.
+ *
+ * @param id Node ID.
+ * @param macs MAC addresses.
+ * @param addrs Addresses.
+ */
+ public MockNode(UUID id, String macs, String... addrs) {
+ assert addrs != null;
+
+ this.id = id;
+ this.macs = macs;
+
+ this.addrs = Arrays.asList(addrs);
+ }
+
+ /** {@inheritDoc} */
+ @Override public UUID id() {
+ return id;
+ }
+
+ /** {@inheritDoc} */
+ @SuppressWarnings("unchecked")
+ @Nullable @Override public <T> T attribute(String name) {
+ if (F.eq(name, IgniteNodeAttributes.ATTR_MACS))
+ return (T)macs;
+
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> addresses() {
+ return addrs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Object consistentId() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public ClusterMetrics metrics() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Map<String, Object> attributes() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hostNames() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long order() {
+ throwUnsupported();
+
+ return 0;
+ }
+
+ /** {@inheritDoc} */
+ @Override public IgniteProductVersion version() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isLocal() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isDaemon() {
+ throwUnsupported();
+
+ return false;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isClient() {
+ throwUnsupported();
+
+ return false;
+ }
+ }
+
+ /**
+ * Locations builder.
+ */
+ private static class LocationsBuilder {
+ /** Locations. */
+ private final TreeMap<Long, Collection<MockNode>> locs = new TreeMap<>();
+
+ /**
+ * Create new locations builder.
+ *
+ * @return Locations builder.
+ */
+ public static LocationsBuilder create() {
+ return new LocationsBuilder();
+ }
+
+ /**
+ * Add locations.
+ *
+ * @param start Start.
+ * @param nodes Nodes.
+ * @return This builder for chaining.
+ */
+ public LocationsBuilder add(long start, MockNode... nodes) {
+ locs.put(start, Arrays.asList(nodes));
+
+ return this;
+ }
+
+ /**
+ * Build locations.
+ *
+ * @return Locations.
+ */
+ public TreeMap<Long, Collection<MockNode>> build() {
+ return locs;
+ }
+
+ /**
+ * Build IGFS.
+ *
+ * @return IGFS.
+ */
+ public MockIgfs buildIgfs() {
+ return new MockIgfs(build());
+ }
+ }
+
+ /**
+ * Mocked IGFS.
+ */
+ private static class MockIgfs extends IgfsMock {
+ /** Block locations. */
+ private final TreeMap<Long, Collection<MockNode>> locs;
+
+ /**
+ * Constructor.
+ *
+ * @param locs Block locations.
+ */
+ public MockIgfs(TreeMap<Long, Collection<MockNode>> locs) {
+ super("igfs");
+
+ this.locs = locs;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<IgfsBlockLocation> affinity(IgfsPath path, long start, long len) {
+ Collection<IgfsBlockLocation> res = new ArrayList<>();
+
+ long cur = start;
+ long remaining = len;
+
+ long prevLocStart = -1;
+ Collection<MockNode> prevLocNodes = null;
+
+ for (Map.Entry<Long, Collection<MockNode>> locEntry : locs.entrySet()) {
+ long locStart = locEntry.getKey();
+ Collection<MockNode> locNodes = locEntry.getValue();
+
+ if (prevLocNodes != null) {
+ if (cur < locStart) {
+ // Add part from previous block.
+ long prevLen = locStart - prevLocStart;
+
+ res.add(new IgfsBlockLocationMock(cur, prevLen, prevLocNodes));
+
+ cur = locStart;
+ remaining -= prevLen;
+ }
+ }
+
+ prevLocStart = locStart;
+ prevLocNodes = locNodes;
+
+ if (remaining == 0)
+ break;
+ }
+
+ // Add remainder.
+ if (remaining != 0)
+ res.add(new IgfsBlockLocationMock(cur, remaining, prevLocNodes));
+
+ return res;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean exists(IgfsPath path) {
+ return true;
+ }
+
+ /** {@inheritDoc} */
+ @Override public boolean isProxy(URI path) {
+ return false;
+ }
+ }
+
+ /**
+ * Mocked block location.
+ */
+ private static class IgfsBlockLocationMock implements IgfsBlockLocation {
+ /** Start. */
+ private final long start;
+
+ /** Length. */
+ private final long len;
+
+ /** Node IDs. */
+ private final List<UUID> nodeIds;
+
+ /**
+ * Constructor.
+ *
+ * @param start Start.
+ * @param len Length.
+ * @param nodes Nodes.
+ */
+ public IgfsBlockLocationMock(long start, long len, Collection<MockNode> nodes) {
+ this.start = start;
+ this.len = len;
+
+ this.nodeIds = new ArrayList<>(nodes.size());
+
+ for (MockNode node : nodes)
+ nodeIds.add(node.id);
+ }
+
+ /** {@inheritDoc} */
+ @Override public long start() {
+ return start;
+ }
+
+ /** {@inheritDoc} */
+ @Override public long length() {
+ return len;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<UUID> nodeIds() {
+ return nodeIds;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> names() {
+ throwUnsupported();
+
+ return null;
+ }
+
+ /** {@inheritDoc} */
+ @Override public Collection<String> hosts() {
+ throwUnsupported();
+
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/ignite/blob/11b00873/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
----------------------------------------------------------------------
diff --git a/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
new file mode 100644
index 0000000..13f00bd
--- /dev/null
+++ b/modules/hadoop-impl/src/test/java/org/apache/ignite/internal/processors/hadoop/impl/HadoopWeightedPlannerMapReduceTest.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.processors.hadoop.impl;
+
+import org.apache.ignite.configuration.HadoopConfiguration;
+import org.apache.ignite.hadoop.mapreduce.IgniteHadoopWeightedMapReducePlanner;
+
+/**
+ * Tests whole map-red execution Weighted planner.
+ */
+public class HadoopWeightedPlannerMapReduceTest extends HadoopMapReduceTest {
+ /** {@inheritDoc} */
+ @Override protected HadoopConfiguration createHadoopConfiguration() {
+ HadoopConfiguration hadoopCfg = new HadoopConfiguration();
+
+ // Use weighted planner with default settings:
+ IgniteHadoopWeightedMapReducePlanner planner = new IgniteHadoopWeightedMapReducePlanner();
+
+ hadoopCfg.setMapReducePlanner(planner);
+
+ return hadoopCfg;
+ }
+}