You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by hy...@apache.org on 2014/08/11 03:55:32 UTC
git commit: TAJO-996: Sometimes,
scheduleFetchesByEvenDistributedVolumes loses some FetchImpls.
Repository: tajo
Updated Branches:
refs/heads/master 912df6f08 -> 189cf3ffe
TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses some FetchImpls.
Closes #109
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/189cf3ff
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/189cf3ff
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/189cf3ff
Branch: refs/heads/master
Commit: 189cf3ffe9d1b5c625c553f27fc5be37cd7522be
Parents: 912df6f
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Aug 11 10:54:20 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Aug 11 10:54:20 2014 +0900
----------------------------------------------------------------------
CHANGES | 3 ++
.../tajo/master/querymaster/Repartitioner.java | 1 +
.../java/org/apache/tajo/worker/FetchImpl.java | 23 +++++++++
.../apache/tajo/master/TestRepartitioner.java | 52 +++++++++++++++-----
4 files changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e1d6d03..7487983 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,9 @@ Release 0.9.0 - unreleased
BUG FIXES
+ TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses
+ some FetchImpls. (hyunsik)
+
TAJO-949: PullServer does not release files, when a channel throws
an internal exception. (jinho)
http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 43d6fd2..fa1ed4c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -831,6 +831,7 @@ public class Repartitioner {
while (p > 0 && iterator.hasNext()) {
FetchGroupMeta fetchGroupMeta = iterator.next();
assignedVolumes[p] += fetchGroupMeta.getVolume();
+ TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
// While the current one is smaller than next one, it adds additional fetches to current one.
while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p]) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 869c106..7baae64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -26,6 +26,7 @@ import org.apache.tajo.common.ProtoObject;
import org.apache.tajo.ipc.TajoWorkerProtocol;
import org.apache.tajo.master.querymaster.QueryUnit;
import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.util.TUtil;
import java.net.URI;
import java.util.ArrayList;
@@ -220,4 +221,26 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
}
return newFetchImpl;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ FetchImpl fetch = (FetchImpl) o;
+
+ return TUtil.checkEquals(hasNext, fetch.hasNext) &&
+ TUtil.checkEquals(partitionId, fetch.partitionId) &&
+ TUtil.checkEquals(attemptIds, fetch.attemptIds) &&
+ TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
+ TUtil.checkEquals(host, fetch.host) &&
+ TUtil.checkEquals(name, fetch.name) &&
+ TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+ TUtil.checkEquals(taskIds, fetch.taskIds) &&
+ TUtil.checkEquals(type, fetch.type);
+ }
}
http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 009c02e..29aeccd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -19,7 +19,9 @@
package org.apache.tajo.master;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
import org.apache.tajo.QueryId;
import org.apache.tajo.TestTajoIds;
import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -32,12 +34,10 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.junit.Test;
import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import static junit.framework.Assert.assertEquals;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
import static org.junit.Assert.assertTrue;
@@ -55,7 +55,7 @@ public class TestRepartitioner {
intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port)));
}
- FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+ FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
sid, partitionId, intermediateEntries);
fetch.setName(sid.toString());
@@ -95,38 +95,49 @@ public class TestRepartitioner {
String tableName = "test1";
- fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl()));
- fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl()));
- fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl()));
- fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl()));
- fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl()));
- fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl()));
+ ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+ FetchImpl [] fetches = new FetchImpl[12];
+ for (int i = 0; i < 12; i++) {
+ fetches[i] = new FetchImpl(new QueryUnit.PullHost("localhost", 10000 + i), ShuffleType.HASH_SHUFFLE, ebId, i / 2);
+ }
+
+ int [] VOLUMES = {100, 80, 70, 30, 10, 5};
+
+ for (int i = 0; i < 12; i += 2) {
+ fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
+ }
Pair<Long [], Map<String, List<FetchImpl>>[]> results;
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
assertFetchVolumes(expected, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
long expected0 [] = {130, 165};
assertFetchVolumes(expected0, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
long expected1 [] = {100, 95, 100};
assertFetchVolumes(expected1, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
long expected2 [] = {100, 80, 70, 45};
assertFetchVolumes(expected2, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
long expected3 [] = {100, 80, 70, 30, 15};
assertFetchVolumes(expected3, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
long expected4 [] = {100, 80, 70, 30, 10, 5};
assertFetchVolumes(expected4, results.getFirst());
+ assertFetchImpl(fetches, results.getSecond());
}
private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -136,4 +147,23 @@ public class TestRepartitioner {
assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]);
}
}
+
+ private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
+ Set<FetchImpl> expectedURLs = Sets.newHashSet();
+
+ for (FetchImpl f : expected) {
+ expectedURLs.add(f);
+ }
+
+ Set<FetchImpl> resultURLs = Sets.newHashSet();
+
+ for (Map<String, List<FetchImpl>> e : result) {
+ for (List<FetchImpl> list : e.values()) {
+ resultURLs.addAll(list);
+ }
+ }
+
+ assertEquals(expectedURLs.size(), resultURLs.size());
+ assertEquals(expectedURLs, resultURLs);
+ }
}