You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/24 23:50:45 UTC
hive git commit: HIVE-14574 : use consistent hashing for LLAP
consistent splits to alleviate impact from cluster changes (Sergey Shelukhin,
reviewed by Prasanth Jayachandran)
Repository: hive
Updated Branches:
refs/heads/master ea3c79e4f -> c989605d9
HIVE-14574 : use consistent hashing for LLAP consistent splits to alleviate impact from cluster changes (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c989605d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c989605d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c989605d
Branch: refs/heads/master
Commit: c989605d995b63393fae834667549ec18b67475b
Parents: ea3c79e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 24 16:45:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 24 16:50:24 2016 -0700
----------------------------------------------------------------------
.../tez/HostAffinitySplitLocationProvider.java | 26 +++---
.../TestHostAffinitySplitLocationProvider.java | 85 ++++++++++++++++++--
2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index c06499e..aafc27e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -17,7 +17,8 @@ package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.io.DataOutputBuffer;
+import com.google.common.hash.Hashing;
+
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -54,9 +55,7 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
public String[] getLocations(InputSplit split) throws IOException {
if (split instanceof FileSplit) {
FileSplit fsplit = (FileSplit) split;
- long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart());
- int indexRaw = (int) (hash % knownLocations.length);
- int index = Math.abs(indexRaw);
+ int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart());
if (isDebugEnabled) {
LOG.debug(
"Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" +
@@ -72,15 +71,14 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
}
}
- private long generateHash(String path, long startOffset) throws IOException {
- // Explicitly using only the start offset of a split, and not the length.
- // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
- // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split
- // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them
- // to different nodes.
- DataOutputBuffer dob = new DataOutputBuffer();
- dob.writeLong(startOffset);
- dob.writeUTF(path);
- return Murmur3.hash64(dob.getData(), 0, dob.getLength());
+
+ private int chooseBucket(String path, long startOffset) throws IOException {
+ // Explicitly using only the start offset of a split, and not the length. Splits generated on
+ // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
+ // There is the drawback of potentially hashing the same data on multiple nodes though, when a
+ // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous
+ // large split and send them to different nodes.
+ long hashCode = ((startOffset >> 2) * 37) ^ Murmur3.hash64(path.getBytes());
+ return Hashing.consistentHash(hashCode, knownLocations.length);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index d98a5ff..54f7363 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -14,15 +14,12 @@
package org.apache.hadoop.hive.ql.exec.tez;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
@@ -31,8 +28,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class TestHostAffinitySplitLocationProvider {
+ private final Logger LOG = LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class);
private static final String[] locations = new String[5];
@@ -90,6 +90,81 @@ public class TestHostAffinitySplitLocationProvider {
assertTrue(executorLocationsSet.contains(retLoc3[0]));
}
+
+ @Test (timeout = 10000)
+ public void testConsistentHashing() throws IOException {
+ final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100;
+
+ String[] locations = new String[LOC_COUNT];
+ for (int i = 0; i < locations.length; ++i) {
+ locations[i] = String.valueOf(i);
+ }
+ InputSplit[] splits = new InputSplit[SPLIT_COUNT];
+ for (int i = 0; i < splits.length; ++i) {
+ splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {});
+ }
+
+ StringBuilder failBuilder = new StringBuilder("\n");
+ String[] lastLocations = new String[splits.length];
+ double movedRatioSum = 0, newRatioSum = 0,
+ movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE;
+ for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) {
+ String[] partLoc = Arrays.copyOf(locations, locs);
+ HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc);
+ int moved = 0, newLoc = 0;
+ String newNode = partLoc[locs - 1];
+ for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
+ String[] splitLocations = lp.getLocations(splits[splitIx]);
+ assertEquals(1, splitLocations.length);
+ String splitLocation = splitLocations[0];
+ if (locs > MIN_LOC_COUNT && !splitLocation.equals(lastLocations[splitIx])) {
+ ++moved;
+ }
+ if (newNode.equals(splitLocation)) {
+ ++newLoc;
+ }
+ lastLocations[splitIx] = splitLocation;
+ }
+ if (locs == MIN_LOC_COUNT) continue;
+ String msgTail = " when going to " + locs + " locations";
+ String movedMsg = moved + " splits moved",
+ newMsg = newLoc + " splits went to the new node";
+ LOG.info(movedMsg + " and " + newMsg + msgTail);
+ double maxMoved = 1.0f * splits.length / locs, minNew = 1.0f * splits.length / locs;
+ movedRatioSum += Math.max(moved / maxMoved, 1f);
+ movedRatioWorst = Math.max(moved / maxMoved, movedRatioWorst);
+ newRatioSum += Math.min(newLoc / minNew, 1f);
+ newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
+ logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew);
+ }
+ int count = locations.length - MIN_LOC_COUNT;
+ double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / count;
+ String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + movedRatioWorst
+ + "; assigned to new node: average " + newRatioAvg + ", worst " + newRatioWorst;
+ LOG.info(errorMsg);
+ // Give it a LOT of slack, since on low numbers consistent hashing is very imprecise.
+ if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f
+ || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) {
+ fail(errorMsg + "; example failures: " + failBuilder.toString());
+ }
+ }
+
+ private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, String msgTail,
+ String movedMsg, String newMsg, double maxMoved, double minNew) {
+ boolean logged = false;
+ if (moved > maxMoved * 1.33f) {
+ failBuilder.append(movedMsg).append(" (threshold ").append(maxMoved).append(") ");
+ logged = true;
+ }
+ if (newLoc < minNew * 0.75f) {
+ failBuilder.append(newMsg).append(" (threshold ").append(minNew).append(") ");
+ logged = true;
+ }
+ if (logged) {
+ failBuilder.append(msgTail).append(";\n");
+ }
+ }
+
@Test (timeout = 5000)
public void testOrcSplitsLocationAffinity() throws IOException {
HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);