You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/30 05:51:23 UTC

drill git commit: DRILL-2916: Fix bug in assignment code

Repository: drill
Updated Branches:
  refs/heads/master e428f0d56 -> f29444fa2


DRILL-2916: Fix bug in assignment code


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f29444fa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f29444fa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f29444fa

Branch: refs/heads/master
Commit: f29444fa2eb28f94e6fb37cda058ced470ec452e
Parents: e428f0d
Author: Steven Phillips <sm...@apache.org>
Authored: Wed Apr 29 18:06:40 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 29 19:20:17 2015 -0700

----------------------------------------------------------------------
 .../exec/store/schedule/AssignmentCreator.java  | 51 ++++++++--
 .../drill/exec/store/store/TestAssignment.java  | 98 ++++++++++++++++++++
 2 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/f29444fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index b5eee5d..bfc104f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
 import com.carrotsearch.hppc.cursors.ObjectLongCursor;
 import com.google.common.collect.Iterators;
 import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 
 import com.google.common.base.Stopwatch;
@@ -64,6 +65,11 @@ public class AssignmentCreator<T extends CompleteWork> {
   private List<T> units;
 
   /**
+   * Mappings
+   */
+  private ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
+
+  /**
    * A list of DrillbitEndpoints, where the index in the list corresponds to the minor fragment id
    */
   private List<DrillbitEndpoint> incomingEndpoints;
@@ -97,10 +103,31 @@ public class AssignmentCreator<T extends CompleteWork> {
     watch.start();
     maxWork = (int) Math.ceil(units.size() / ((float) incomingEndpoints.size()));
     LinkedList<WorkEndpointListPair<T>> workList = getWorkList();
-    LinkedList<WorkEndpointListPair<T>> unassignedWorkList = Lists.newLinkedList();
+    LinkedList<WorkEndpointListPair<T>> unassignedWorkList;
     Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
-    ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
 
+    unassignedWorkList = assign(workList, endpointIterators, true);
+
+    assignLeftovers(unassignedWorkList, endpointIterators, true);
+    assignLeftovers(unassignedWorkList, endpointIterators, false);
+
+    if (unassignedWorkList.size() != 0) {
+      throw new DrillRuntimeException("There are still unassigned work units");
+    }
+
+    logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
+    return mappings;
+  }
+
+  /**
+   *
+   * @param workList the list of work units to assign
+   * @param endpointIterators the endpointIterators to assign to
+   * @param assignMinimum whether to assign only up to the minimum required
+   * @return a list of unassigned work units
+   */
+  private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>> workList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) {
+    LinkedList<WorkEndpointListPair<T>> currentUnassignedList = Lists.newLinkedList();
     outer: for (WorkEndpointListPair<T> workPair : workList) {
       List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
       for (DrillbitEndpoint endpoint : endpoints) {
@@ -108,18 +135,27 @@ public class AssignmentCreator<T extends CompleteWork> {
         if (iteratorWrapper == null) {
           continue;
         }
-        if (iteratorWrapper.count < iteratorWrapper.maxCount) {
+        if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) {
           Integer assignment = iteratorWrapper.iter.next();
           iteratorWrapper.count++;
           mappings.put(assignment, workPair.work);
           continue outer;
         }
       }
-      unassignedWorkList.add(workPair);
+      currentUnassignedList.add(workPair);
     }
+    return currentUnassignedList;
+  }
 
+  /**
+   *
+   * @param unassignedWorkList the work units to assign
+   * @param endpointIterators the endpointIterators to assign to
+   * @param assignMinimum wheterh to assign the minimum amount
+   */
+  private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) {
     outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) {
-      while (iteratorWrapper.count < iteratorWrapper.maxCount) {
+      while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) {
         WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
         if (workPair == null) {
           break outer;
@@ -129,9 +165,6 @@ public class AssignmentCreator<T extends CompleteWork> {
         mappings.put(assignment, workPair.work);
       }
     }
-
-    logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
-    return mappings;
   }
 
   /**
@@ -214,6 +247,7 @@ public class AssignmentCreator<T extends CompleteWork> {
       FragIteratorWrapper wrapper = new FragIteratorWrapper();
       wrapper.iter = Iterators.cycle(mmap.get(endpoint));
       wrapper.maxCount = maxWork * mmap.get(endpoint).size();
+      wrapper.minCount = Math.max((maxWork - 1) * mmap.get(endpoint).size(), 1);
       map.put(endpoint, wrapper);
     }
     return map;
@@ -226,6 +260,7 @@ public class AssignmentCreator<T extends CompleteWork> {
   private static class FragIteratorWrapper {
     int count = 0;
     int maxCount;
+    int minCount;
     Iterator<Integer> iter;
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/f29444fa/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
new file mode 100644
index 0000000..65d8cf7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.store;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestAssignment {
+
+  private static final long FILE_SIZE = 1000;
+  private static List<DrillbitEndpoint> endpoints;
+
+  @BeforeClass
+  public static void setup() {
+    endpoints = Lists.newArrayList();
+    final String pattern = "node%d";
+    for (int i = 2; i < 32; i++) {
+      String host = String.format(pattern, i);
+      endpoints.add(DrillbitEndpoint.newBuilder().setAddress(host).build());
+    }
+  }
+
+  @Test
+  public void manyFiles() throws Exception {
+    List<CompleteFileWork> chunks = generateChunks(1000);
+
+    Iterator<DrillbitEndpoint> incomingEndpointsIterator = Iterators.cycle(endpoints);
+
+    List<DrillbitEndpoint> incomingEndpoints = Lists.newArrayList();
+
+    final int width = 28 * 30;
+    for (int i = 0; i < width; i++) {
+      incomingEndpoints.add(incomingEndpointsIterator.next());
+    }
+
+    ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+    System.out.println(mappings.keySet().size());
+    for (int i = 0; i < width; i++) {
+      Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);
+    }
+  }
+
+  private List<CompleteFileWork> generateChunks(int chunks) {
+    List<CompleteFileWork> chunkList = Lists.newArrayList();
+    for (int i = 0; i < chunks; i++) {
+      CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, "file" + i);
+      chunkList.add(chunk);
+    }
+    return chunkList;
+  }
+
+  private EndpointByteMap createByteMap() {
+    EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
+    Set<DrillbitEndpoint> usedEndpoints = Sets.newHashSet();
+    while (usedEndpoints.size() < 3) {
+      usedEndpoints.add(getRandom(endpoints));
+    }
+    for (DrillbitEndpoint ep : usedEndpoints) {
+      endpointByteMap.add(ep, FILE_SIZE);
+    }
+    return endpointByteMap;
+  }
+
+  private <T> T getRandom(List<T> list) {
+    int index = ThreadLocalRandom.current().nextInt(list.size());
+    return list.get(index);
+  }
+}