You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/11 10:11:38 UTC

[09/17] git commit: TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses some FetchImpls.

TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses some FetchImpls.

Closes #109


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/189cf3ff
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/189cf3ff
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/189cf3ff

Branch: refs/heads/index_support
Commit: 189cf3ffe9d1b5c625c553f27fc5be37cd7522be
Parents: 912df6f
Author: Hyunsik Choi <hy...@apache.org>
Authored: Mon Aug 11 10:54:20 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Mon Aug 11 10:54:20 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |  3 ++
 .../tajo/master/querymaster/Repartitioner.java  |  1 +
 .../java/org/apache/tajo/worker/FetchImpl.java  | 23 +++++++++
 .../apache/tajo/master/TestRepartitioner.java   | 52 +++++++++++++++-----
 4 files changed, 68 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index e1d6d03..7487983 100644
--- a/CHANGES
+++ b/CHANGES
@@ -106,6 +106,9 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-996: Sometimes, scheduleFetchesByEvenDistributedVolumes loses
+    some FetchImpls. (hyunsik)
+
     TAJO-949: PullServer does not release files, when a channel throws 
     an internal exception. (jinho)
     

http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
index 43d6fd2..fa1ed4c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java
@@ -831,6 +831,7 @@ public class Repartitioner {
       while (p > 0 && iterator.hasNext()) {
         FetchGroupMeta fetchGroupMeta = iterator.next();
         assignedVolumes[p] += fetchGroupMeta.getVolume();
+        TUtil.putCollectionToNestedList(fetchesArray[p], tableName, fetchGroupMeta.fetchUrls);
 
         // While the current one is smaller than next one, it adds additional fetches to current one.
         while(iterator.hasNext() && assignedVolumes[p - 1] > assignedVolumes[p]) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
index 869c106..7baae64 100644
--- a/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
+++ b/tajo-core/src/main/java/org/apache/tajo/worker/FetchImpl.java
@@ -26,6 +26,7 @@ import org.apache.tajo.common.ProtoObject;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
 import org.apache.tajo.master.querymaster.QueryUnit;
 import org.apache.tajo.master.querymaster.Repartitioner;
+import org.apache.tajo.util.TUtil;
 
 import java.net.URI;
 import java.util.ArrayList;
@@ -220,4 +221,26 @@ public class FetchImpl implements ProtoObject<TajoWorkerProtocol.FetchProto>, Cl
     }
     return newFetchImpl;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FetchImpl fetch = (FetchImpl) o;
+
+    return TUtil.checkEquals(hasNext, fetch.hasNext) &&
+        TUtil.checkEquals(partitionId, fetch.partitionId) &&
+        TUtil.checkEquals(attemptIds, fetch.attemptIds) &&
+        TUtil.checkEquals(executionBlockId, fetch.executionBlockId) &&
+        TUtil.checkEquals(host, fetch.host) &&
+        TUtil.checkEquals(name, fetch.name) &&
+        TUtil.checkEquals(rangeParams, fetch.rangeParams) &&
+        TUtil.checkEquals(taskIds, fetch.taskIds) &&
+        TUtil.checkEquals(type, fetch.type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/tajo/blob/189cf3ff/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
index 009c02e..29aeccd 100644
--- a/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
+++ b/tajo-core/src/test/java/org/apache/tajo/master/TestRepartitioner.java
@@ -19,7 +19,9 @@
 package org.apache.tajo.master;
 
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import org.apache.tajo.ExecutionBlockId;
+import org.apache.tajo.LocalTajoTestingUtility;
 import org.apache.tajo.QueryId;
 import org.apache.tajo.TestTajoIds;
 import org.apache.tajo.ipc.TajoWorkerProtocol;
@@ -32,12 +34,10 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder;
 import org.junit.Test;
 
 import java.net.URI;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 import static junit.framework.Assert.assertEquals;
+import static org.apache.tajo.ipc.TajoWorkerProtocol.ShuffleType;
 import static org.apache.tajo.master.querymaster.Repartitioner.FetchGroupMeta;
 import static org.junit.Assert.assertTrue;
 
@@ -55,7 +55,7 @@ public class TestRepartitioner {
       intermediateEntries.add(new QueryUnit.IntermediateEntry(i, 0, partitionId, new QueryUnit.PullHost(hostName, port)));
     }
 
-    FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), TajoWorkerProtocol.ShuffleType.HASH_SHUFFLE,
+    FetchImpl fetch = new FetchImpl(new QueryUnit.PullHost(hostName, port), ShuffleType.HASH_SHUFFLE,
         sid, partitionId, intermediateEntries);
 
     fetch.setName(sid.toString());
@@ -95,38 +95,49 @@ public class TestRepartitioner {
     String tableName = "test1";
 
 
-    fetchGroups.put(0, new FetchGroupMeta(100, new FetchImpl()));
-    fetchGroups.put(1, new FetchGroupMeta(80, new FetchImpl()));
-    fetchGroups.put(2, new FetchGroupMeta(70, new FetchImpl()));
-    fetchGroups.put(3, new FetchGroupMeta(30, new FetchImpl()));
-    fetchGroups.put(4, new FetchGroupMeta(10, new FetchImpl()));
-    fetchGroups.put(5, new FetchGroupMeta(5, new FetchImpl()));
+    ExecutionBlockId ebId = new ExecutionBlockId(LocalTajoTestingUtility.newQueryId(), 0);
+    FetchImpl [] fetches = new FetchImpl[12];
+    for (int i = 0; i < 12; i++) {
+      fetches[i] = new FetchImpl(new QueryUnit.PullHost("localhost", 10000 + i), ShuffleType.HASH_SHUFFLE, ebId, i / 2);
+    }
+
+    int [] VOLUMES = {100, 80, 70, 30, 10, 5};
+
+    for (int i = 0; i < 12; i += 2) {
+      fetchGroups.put(i, new FetchGroupMeta(VOLUMES[i / 2], fetches[i]).addFetche(fetches[i + 1]));
+    }
 
     Pair<Long [], Map<String, List<FetchImpl>>[]> results;
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 1);
     long expected [] = {100 + 80 + 70 + 30 + 10 + 5};
     assertFetchVolumes(expected, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 2);
     long expected0 [] = {130, 165};
     assertFetchVolumes(expected0, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 3);
     long expected1 [] = {100, 95, 100};
     assertFetchVolumes(expected1, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 4);
     long expected2 [] = {100, 80, 70, 45};
     assertFetchVolumes(expected2, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 5);
     long expected3 [] = {100, 80, 70, 30, 15};
     assertFetchVolumes(expected3, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
 
     results = Repartitioner.makeEvenDistributedFetchImpl(fetchGroups, tableName, 6);
     long expected4 [] = {100, 80, 70, 30, 10, 5};
     assertFetchVolumes(expected4, results.getFirst());
+    assertFetchImpl(fetches, results.getSecond());
   }
 
   private static void assertFetchVolumes(long [] expected, Long [] results) {
@@ -136,4 +147,23 @@ public class TestRepartitioner {
       assertTrue(expected[i] + " is expected, but " + results[i], expected[i] == results[i]);
     }
   }
+
+  private static void assertFetchImpl(FetchImpl [] expected, Map<String, List<FetchImpl>>[] result) {
+    Set<FetchImpl> expectedURLs = Sets.newHashSet();
+
+    for (FetchImpl f : expected) {
+      expectedURLs.add(f);
+    }
+
+    Set<FetchImpl> resultURLs = Sets.newHashSet();
+
+    for (Map<String, List<FetchImpl>> e : result) {
+      for (List<FetchImpl> list : e.values()) {
+        resultURLs.addAll(list);
+      }
+    }
+
+    assertEquals(expectedURLs.size(), resultURLs.size());
+    assertEquals(expectedURLs, resultURLs);
+  }
 }