You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/08/24 23:50:45 UTC

hive git commit: HIVE-14574 : use consistent hashing for LLAP consistent splits to alleviate impact from cluster changes (Sergey Shelukhin, reviewed by Prasanth Jayachandran)

Repository: hive
Updated Branches:
  refs/heads/master ea3c79e4f -> c989605d9


HIVE-14574 : use consistent hashing for LLAP consistent splits to alleviate impact from cluster changes (Sergey Shelukhin, reviewed by Prasanth Jayachandran)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c989605d
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c989605d
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c989605d

Branch: refs/heads/master
Commit: c989605d995b63393fae834667549ec18b67475b
Parents: ea3c79e
Author: Sergey Shelukhin <se...@apache.org>
Authored: Wed Aug 24 16:45:45 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Wed Aug 24 16:50:24 2016 -0700

----------------------------------------------------------------------
 .../tez/HostAffinitySplitLocationProvider.java  | 26 +++---
 .../TestHostAffinitySplitLocationProvider.java  | 85 ++++++++++++++++++--
 2 files changed, 92 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index c06499e..aafc27e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -17,7 +17,8 @@ package org.apache.hadoop.hive.ql.exec.tez;
 import java.io.IOException;
 
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.io.DataOutputBuffer;
+import com.google.common.hash.Hashing;
+
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -54,9 +55,7 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
   public String[] getLocations(InputSplit split) throws IOException {
     if (split instanceof FileSplit) {
       FileSplit fsplit = (FileSplit) split;
-      long hash = generateHash(fsplit.getPath().toString(), fsplit.getStart());
-      int indexRaw = (int) (hash % knownLocations.length);
-      int index = Math.abs(indexRaw);
+      int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart());
       if (isDebugEnabled) {
         LOG.debug(
             "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" +
@@ -72,15 +71,14 @@ public class HostAffinitySplitLocationProvider implements SplitLocationProvider
     }
   }
 
-  private long generateHash(String path, long startOffset) throws IOException {
-    // Explicitly using only the start offset of a split, and not the length.
-    // Splits generated on block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
-    // There is the drawback of potentially hashing the same data on multiple nodes though, when a large split
-    // is sent to 1 node, and a second invocation uses smaller chunks of the previous large split and send them
-    // to different nodes.
-    DataOutputBuffer dob = new DataOutputBuffer();
-    dob.writeLong(startOffset);
-    dob.writeUTF(path);
-    return Murmur3.hash64(dob.getData(), 0, dob.getLength());
+
+  private int chooseBucket(String path, long startOffset) throws IOException {
+    // Explicitly using only the start offset of a split, and not the length. Splits generated on
+    // block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
+    // There is the drawback of potentially hashing the same data on multiple nodes though, when a
+    // large split is sent to 1 node, and a second invocation uses smaller chunks of the previous
+    // large split and send them to different nodes.
+    long hashCode = ((startOffset >> 2) * 37) ^ Murmur3.hash64(path.getBytes());
+    return Hashing.consistentHash(hashCode, knownLocations.length);
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c989605d/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index d98a5ff..54f7363 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -14,15 +14,12 @@
 
 package org.apache.hadoop.hive.ql.exec.tez;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Set;
 
@@ -31,8 +28,11 @@ import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 import org.apache.hadoop.mapred.FileSplit;
 import org.apache.hadoop.mapred.InputSplit;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class TestHostAffinitySplitLocationProvider {
+  private final Logger LOG = LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class);
 
 
   private static final String[] locations = new String[5];
@@ -90,6 +90,81 @@ public class TestHostAffinitySplitLocationProvider {
     assertTrue(executorLocationsSet.contains(retLoc3[0]));
   }
 
+
+  @Test (timeout = 10000)
+  public void testConsistentHashing() throws IOException {
+    final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100;
+
+    String[] locations = new String[LOC_COUNT];
+    for (int i = 0; i < locations.length; ++i) {
+      locations[i] = String.valueOf(i);
+    }
+    InputSplit[] splits = new InputSplit[SPLIT_COUNT];
+    for (int i = 0; i < splits.length; ++i) {
+      splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {});
+    }
+
+    StringBuilder failBuilder = new StringBuilder("\n");
+    String[] lastLocations = new String[splits.length];
+    double movedRatioSum = 0, newRatioSum = 0,
+        movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE;
+    for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) {
+      String[] partLoc = Arrays.copyOf(locations, locs);
+      HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc);
+      int moved = 0, newLoc = 0;
+      String newNode = partLoc[locs - 1];
+      for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
+        String[] splitLocations = lp.getLocations(splits[splitIx]);
+        assertEquals(1, splitLocations.length);
+        String splitLocation = splitLocations[0];
+        if (locs > MIN_LOC_COUNT && !splitLocation.equals(lastLocations[splitIx])) {
+          ++moved;
+        }
+        if (newNode.equals(splitLocation)) {
+          ++newLoc;
+        }
+        lastLocations[splitIx] = splitLocation;
+      }
+      if (locs == MIN_LOC_COUNT) continue;
+      String msgTail = " when going to " + locs + " locations";
+      String movedMsg = moved + " splits moved",
+          newMsg = newLoc + " splits went to the new node";
+      LOG.info(movedMsg + " and " + newMsg + msgTail);
+      double maxMoved = 1.0f * splits.length / locs, minNew = 1.0f * splits.length / locs;
+      movedRatioSum += Math.max(moved / maxMoved, 1f);
+      movedRatioWorst = Math.max(moved / maxMoved, movedRatioWorst);
+      newRatioSum += Math.min(newLoc / minNew, 1f);
+      newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
+      logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew);
+    }
+    int count = locations.length - MIN_LOC_COUNT;
+    double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / count;
+    String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + movedRatioWorst
+        + "; assigned to new node: average " + newRatioAvg + ", worst " + newRatioWorst;
+    LOG.info(errorMsg);
+    // Give it a LOT of slack, since on low numbers consistent hashing is very imprecise.
+    if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f
+        || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) {
+      fail(errorMsg + "; example failures: " + failBuilder.toString());
+    }
+  }
+
+  private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, String msgTail,
+      String movedMsg, String newMsg, double maxMoved, double minNew) {
+    boolean logged = false;
+    if (moved > maxMoved * 1.33f) {
+      failBuilder.append(movedMsg).append(" (threshold ").append(maxMoved).append(") ");
+      logged = true;
+    }
+    if (newLoc < minNew * 0.75f) {
+      failBuilder.append(newMsg).append(" (threshold ").append(minNew).append(") ");
+      logged = true;
+    }
+    if (logged) {
+      failBuilder.append(msgTail).append(";\n");
+    }
+  }
+
   @Test (timeout = 5000)
   public void testOrcSplitsLocationAffinity() throws IOException {
     HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);