You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ignite.apache.org by vo...@apache.org on 2016/11/22 13:58:45 UTC
ignite git commit: Switched to hashmap for mapper.
Repository: ignite
Updated Branches:
refs/heads/ignite-1.6.10-hadoop-debug 5c733d5a9 -> 84b23474b
Switched to hashmap for mapper.
Project: http://git-wip-us.apache.org/repos/asf/ignite/repo
Commit: http://git-wip-us.apache.org/repos/asf/ignite/commit/84b23474
Tree: http://git-wip-us.apache.org/repos/asf/ignite/tree/84b23474
Diff: http://git-wip-us.apache.org/repos/asf/ignite/diff/84b23474
Branch: refs/heads/ignite-1.6.10-hadoop-debug
Commit: 84b23474ba9917f5160691fa677d7736a602b329
Parents: 5c733d5
Author: devozerov <vo...@gridgain.com>
Authored: Tue Nov 22 16:58:38 2016 +0300
Committer: devozerov <vo...@gridgain.com>
Committed: Tue Nov 22 16:58:38 2016 +0300
----------------------------------------------------------------------
.../processors/hadoop/shuffle/HadoopShuffleJob.java | 11 +++++++----
1 file changed, 7 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/ignite/blob/84b23474/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
----------------------------------------------------------------------
diff --git a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
index f586234..adca7b9 100644
--- a/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
+++ b/modules/hadoop/src/main/java/org/apache/ignite/internal/processors/hadoop/shuffle/HadoopShuffleJob.java
@@ -211,13 +211,16 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
/**
* @param maps Maps.
* @param idx Index.
+ * @param mapper If map is requested by mapper.
* @return Map.
*/
- private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx) {
+ private HadoopMultimap getOrCreateMap(AtomicReferenceArray<HadoopMultimap> maps, int idx, boolean mapper) {
HadoopMultimap map = maps.get(idx);
if (map == null) { // Create new map.
- map = get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false) ?
+ boolean hash = mapper || get(job.info(), SHUFFLE_REDUCER_NO_SORTING, false);
+
+ map = hash ?
new HadoopConcurrentHashMultimap(job.info(), mem, get(job.info(), PARTITION_HASHMAP_SIZE, 8 * 1024)):
new HadoopSkipList(job.info(), mem);
@@ -245,7 +248,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
perfCntr.onShuffleMessage(msg.reducer(), U.currentTimeMillis());
- HadoopMultimap map = getOrCreateMap(maps, msg.reducer());
+ HadoopMultimap map = getOrCreateMap(maps, msg.reducer(), false);
// Add data from message to the map.
try (HadoopMultimap.Adder adder = map.startAdding(taskCtx)) {
@@ -612,7 +615,7 @@ public class HadoopShuffleJob<T> implements AutoCloseable {
HadoopTaskOutput out = adders[part];
if (out == null)
- adders[part] = out = getOrCreateMap(maps, part).startAdding(taskCtx);
+ adders[part] = out = getOrCreateMap(maps, part, true).startAdding(taskCtx);
out.write(key, val);
}