You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/04/30 05:51:23 UTC
drill git commit: DRILL-2916: Fix bug in assignment code
Repository: drill
Updated Branches:
refs/heads/master e428f0d56 -> f29444fa2
DRILL-2916: Fix bug in assignment code
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/f29444fa
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/f29444fa
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/f29444fa
Branch: refs/heads/master
Commit: f29444fa2eb28f94e6fb37cda058ced470ec452e
Parents: e428f0d
Author: Steven Phillips <sm...@apache.org>
Authored: Wed Apr 29 18:06:40 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed Apr 29 19:20:17 2015 -0700
----------------------------------------------------------------------
.../exec/store/schedule/AssignmentCreator.java | 51 ++++++++--
.../drill/exec/store/store/TestAssignment.java | 98 ++++++++++++++++++++
2 files changed, 141 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/f29444fa/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
index b5eee5d..bfc104f 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/schedule/AssignmentCreator.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import com.carrotsearch.hppc.cursors.ObjectLongCursor;
import com.google.common.collect.Iterators;
import com.google.common.collect.Maps;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import com.google.common.base.Stopwatch;
@@ -64,6 +65,11 @@ public class AssignmentCreator<T extends CompleteWork> {
private List<T> units;
/**
+ * Mappings
+ */
+ private ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
+
+ /**
* A list of DrillbitEndpoints, where the index in the list corresponds to the minor fragment id
*/
private List<DrillbitEndpoint> incomingEndpoints;
@@ -97,10 +103,31 @@ public class AssignmentCreator<T extends CompleteWork> {
watch.start();
maxWork = (int) Math.ceil(units.size() / ((float) incomingEndpoints.size()));
LinkedList<WorkEndpointListPair<T>> workList = getWorkList();
- LinkedList<WorkEndpointListPair<T>> unassignedWorkList = Lists.newLinkedList();
+ LinkedList<WorkEndpointListPair<T>> unassignedWorkList;
Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators = getEndpointIterators();
- ArrayListMultimap<Integer, T> mappings = ArrayListMultimap.create();
+ unassignedWorkList = assign(workList, endpointIterators, true);
+
+ assignLeftovers(unassignedWorkList, endpointIterators, true);
+ assignLeftovers(unassignedWorkList, endpointIterators, false);
+
+ if (unassignedWorkList.size() != 0) {
+ throw new DrillRuntimeException("There are still unassigned work units");
+ }
+
+ logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
+ return mappings;
+ }
+
+ /**
+ *
+ * @param workList the list of work units to assign
+ * @param endpointIterators the endpointIterators to assign to
+ * @param assignMinimum whether to assign only up to the minimum required
+ * @return a list of unassigned work units
+ */
+ private LinkedList<WorkEndpointListPair<T>> assign(List<WorkEndpointListPair<T>> workList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) {
+ LinkedList<WorkEndpointListPair<T>> currentUnassignedList = Lists.newLinkedList();
outer: for (WorkEndpointListPair<T> workPair : workList) {
List<DrillbitEndpoint> endpoints = workPair.sortedEndpoints;
for (DrillbitEndpoint endpoint : endpoints) {
@@ -108,18 +135,27 @@ public class AssignmentCreator<T extends CompleteWork> {
if (iteratorWrapper == null) {
continue;
}
- if (iteratorWrapper.count < iteratorWrapper.maxCount) {
+ if (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) {
Integer assignment = iteratorWrapper.iter.next();
iteratorWrapper.count++;
mappings.put(assignment, workPair.work);
continue outer;
}
}
- unassignedWorkList.add(workPair);
+ currentUnassignedList.add(workPair);
}
+ return currentUnassignedList;
+ }
+ /**
+ *
+ * @param unassignedWorkList the work units to assign
+ * @param endpointIterators the endpointIterators to assign to
+ * @param assignMinimum wheterh to assign the minimum amount
+ */
+ private void assignLeftovers(LinkedList<WorkEndpointListPair<T>> unassignedWorkList, Map<DrillbitEndpoint,FragIteratorWrapper> endpointIterators, boolean assignMinimum) {
outer: for (FragIteratorWrapper iteratorWrapper : endpointIterators.values()) {
- while (iteratorWrapper.count < iteratorWrapper.maxCount) {
+ while (iteratorWrapper.count < (assignMinimum ? iteratorWrapper.minCount : iteratorWrapper.maxCount)) {
WorkEndpointListPair<T> workPair = unassignedWorkList.poll();
if (workPair == null) {
break outer;
@@ -129,9 +165,6 @@ public class AssignmentCreator<T extends CompleteWork> {
mappings.put(assignment, workPair.work);
}
}
-
- logger.debug("Took {} ms to assign {} work units to {} fragments", watch.elapsed(TimeUnit.MILLISECONDS), units.size(), incomingEndpoints.size());
- return mappings;
}
/**
@@ -214,6 +247,7 @@ public class AssignmentCreator<T extends CompleteWork> {
FragIteratorWrapper wrapper = new FragIteratorWrapper();
wrapper.iter = Iterators.cycle(mmap.get(endpoint));
wrapper.maxCount = maxWork * mmap.get(endpoint).size();
+ wrapper.minCount = Math.max((maxWork - 1) * mmap.get(endpoint).size(), 1);
map.put(endpoint, wrapper);
}
return map;
@@ -226,6 +260,7 @@ public class AssignmentCreator<T extends CompleteWork> {
private static class FragIteratorWrapper {
int count = 0;
int maxCount;
+ int minCount;
Iterator<Integer> iter;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/f29444fa/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
new file mode 100644
index 0000000..65d8cf7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/store/TestAssignment.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.store;
+
+import com.google.common.collect.Iterators;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
+import org.apache.drill.exec.store.schedule.AssignmentCreator;
+import org.apache.drill.exec.store.schedule.CompleteFileWork;
+import org.apache.drill.exec.store.schedule.EndpointByteMap;
+import org.apache.drill.exec.store.schedule.EndpointByteMapImpl;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class TestAssignment {
+
+ private static final long FILE_SIZE = 1000;
+ private static List<DrillbitEndpoint> endpoints;
+
+ @BeforeClass
+ public static void setup() {
+ endpoints = Lists.newArrayList();
+ final String pattern = "node%d";
+ for (int i = 2; i < 32; i++) {
+ String host = String.format(pattern, i);
+ endpoints.add(DrillbitEndpoint.newBuilder().setAddress(host).build());
+ }
+ }
+
+ @Test
+ public void manyFiles() throws Exception {
+ List<CompleteFileWork> chunks = generateChunks(1000);
+
+ Iterator<DrillbitEndpoint> incomingEndpointsIterator = Iterators.cycle(endpoints);
+
+ List<DrillbitEndpoint> incomingEndpoints = Lists.newArrayList();
+
+ final int width = 28 * 30;
+ for (int i = 0; i < width; i++) {
+ incomingEndpoints.add(incomingEndpointsIterator.next());
+ }
+
+ ListMultimap<Integer, CompleteFileWork> mappings = AssignmentCreator.getMappings(incomingEndpoints, chunks);
+ System.out.println(mappings.keySet().size());
+ for (int i = 0; i < width; i++) {
+ Assert.assertTrue("no mapping for entry " + i, mappings.get(i) != null && mappings.get(i).size() > 0);
+ }
+ }
+
+ private List<CompleteFileWork> generateChunks(int chunks) {
+ List<CompleteFileWork> chunkList = Lists.newArrayList();
+ for (int i = 0; i < chunks; i++) {
+ CompleteFileWork chunk = new CompleteFileWork(createByteMap(), 0, FILE_SIZE, "file" + i);
+ chunkList.add(chunk);
+ }
+ return chunkList;
+ }
+
+ private EndpointByteMap createByteMap() {
+ EndpointByteMap endpointByteMap = new EndpointByteMapImpl();
+ Set<DrillbitEndpoint> usedEndpoints = Sets.newHashSet();
+ while (usedEndpoints.size() < 3) {
+ usedEndpoints.add(getRandom(endpoints));
+ }
+ for (DrillbitEndpoint ep : usedEndpoints) {
+ endpointByteMap.add(ep, FILE_SIZE);
+ }
+ return endpointByteMap;
+ }
+
+ private <T> T getRandom(List<T> list) {
+ int index = ThreadLocalRandom.current().nextInt(list.size());
+ return list.get(index);
+ }
+}