You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2016/09/19 23:11:21 UTC
hive git commit: HIVE-14680 : retain consistent splits /during/ (as
opposed to across) LLAP failures on top of HIVE-14589 (Sergey Shelukhin,
reviewed by Siddharth Seth)
Repository: hive
Updated Branches:
refs/heads/master 4340d4619 -> 83ef6f927
HIVE-14680 : retain consistent splits /during/ (as opposed to across) LLAP failures on top of HIVE-14589 (Sergey Shelukhin, reviewed by Siddharth Seth)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/83ef6f92
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/83ef6f92
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/83ef6f92
Branch: refs/heads/master
Commit: 83ef6f9272d71e1918ffc89635709b4f81e8aba9
Parents: 4340d46
Author: Sergey Shelukhin <se...@apache.org>
Authored: Mon Sep 19 16:11:16 2016 -0700
Committer: Sergey Shelukhin <se...@apache.org>
Committed: Mon Sep 19 16:11:16 2016 -0700
----------------------------------------------------------------------
.../hive/llap/registry/ServiceInstanceSet.java | 7 +-
.../registry/impl/InactiveServiceInstance.java | 77 ++++++++++
.../registry/impl/LlapFixedRegistryImpl.java | 2 +-
.../impl/LlapZookeeperRegistryImpl.java | 34 ++++-
.../daemon/services/impl/LlapWebServices.java | 2 +-
.../tez/HostAffinitySplitLocationProvider.java | 80 +++++++---
.../apache/hadoop/hive/ql/exec/tez/Utils.java | 8 +-
.../TestHostAffinitySplitLocationProvider.java | 150 +++++++++++++++----
.../apache/hadoop/hive/serde2/SerDeUtils.java | 11 ++
9 files changed, 306 insertions(+), 65 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
index 13b668d..1e8c895 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceInstanceSet.java
@@ -14,7 +14,6 @@
package org.apache.hadoop.hive.llap.registry;
import java.util.Collection;
-import java.util.List;
import java.util.Set;
public interface ServiceInstanceSet {
@@ -32,9 +31,11 @@ public interface ServiceInstanceSet {
/**
* Gets a list containing all the instances. This list has the same iteration order across
* different processes, assuming the list of registry entries is the same.
- * @return
+ * @param consistentIndexes if true, also try to maintain the same exact index for each node
+ * across calls, by inserting inactive instances to replace the
+ * removed ones.
*/
- public Collection<ServiceInstance> getAllInstancesOrdered();
+ public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes);
/**
* Get an instance by worker identity.
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
new file mode 100644
index 0000000..79b7d51
--- /dev/null
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/InactiveServiceInstance.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.registry.impl;
+
+import java.util.Map;
+
+import org.apache.hadoop.hive.llap.registry.ServiceInstance;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class InactiveServiceInstance implements ServiceInstance {
+ private final String name;
+ public InactiveServiceInstance(String name) {
+ this.name = name;
+ }
+
+ @Override
+ public String getWorkerIdentity() {
+ return name;
+ }
+
+ @Override
+ public boolean isAlive() {
+ return false;
+ }
+
+ @Override
+ public String getHost() {
+ return null;
+ }
+
+ @Override
+ public int getRpcPort() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getManagementPort() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getShufflePort() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getServicesAddress() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getOutputFormatPort() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<String, String> getProperties() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Resource getResource() {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
index de4d7f2..bbfcbf6 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java
@@ -228,7 +228,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry {
}
@Override
- public List<ServiceInstance> getAllInstancesOrdered() {
+ public List<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) {
List<ServiceInstance> list = new LinkedList<>();
list.addAll(instances.values());
Collections.sort(list, new Comparator<ServiceInstance>() {
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 5e17ebf..59f7c9e 100644
--- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -542,8 +542,8 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
}
@Override
- public Collection<ServiceInstance> getAllInstancesOrdered() {
- Map<String, String> slotByWorker = new HashMap<String, String>();
+ public Collection<ServiceInstance> getAllInstancesOrdered(boolean consistentIndexes) {
+ Map<String, Long> slotByWorker = new HashMap<String, Long>();
List<ServiceInstance> unsorted = new LinkedList<ServiceInstance>();
for (ChildData childData : instancesCache.getCurrentData()) {
if (childData == null) continue;
@@ -560,21 +560,45 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry {
" Ignoring from current instances list..", childData.getPath());
}
} else if (nodeName.startsWith(SLOT_PREFIX)) {
- slotByWorker.put(extractWorkerIdFromSlot(childData), nodeName);
+ slotByWorker.put(extractWorkerIdFromSlot(childData),
+ Long.parseLong(nodeName.substring(SLOT_PREFIX.length())));
} else {
LOG.info("Ignoring unknown node {}", childData.getPath());
}
}
- TreeMap<String, ServiceInstance> sorted = new TreeMap<>();
+ TreeMap<Long, ServiceInstance> sorted = new TreeMap<>();
+ long maxSlot = Long.MIN_VALUE;
for (ServiceInstance worker : unsorted) {
- String slot = slotByWorker.get(worker.getWorkerIdentity());
+ Long slot = slotByWorker.get(worker.getWorkerIdentity());
if (slot == null) {
LOG.info("Unknown slot for {}", worker.getWorkerIdentity());
continue;
}
+ maxSlot = Math.max(maxSlot, slot);
sorted.put(slot, worker);
}
+
+ if (consistentIndexes) {
+ // Add dummy instances to all slots where LLAPs are MIA... I can haz insert_iterator?
+ TreeMap<Long, ServiceInstance> dummies = new TreeMap<>();
+ Iterator<Long> keyIter = sorted.keySet().iterator();
+ long expected = 0;
+ Long ts = null;
+ while (keyIter.hasNext()) {
+ Long slot = keyIter.next();
+ assert slot >= expected;
+ while (slot > expected) {
+ if (ts == null) {
+ ts = System.nanoTime(); // Inactive nodes restart every call!
+ }
+ dummies.put(expected, new InactiveServiceInstance("inactive-" + expected + "-" + ts));
+ ++expected;
+ }
+ ++expected;
+ }
+ sorted.putAll(dummies);
+ }
return sorted.values();
}
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
index f85bbf2..0614355 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/services/impl/LlapWebServices.java
@@ -226,7 +226,7 @@ public class LlapWebServices extends AbstractService {
}
jg.writeStringField("identity", registry.getWorkerIdentity());
jg.writeArrayFieldStart("peers");
- for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered()) {
+ for (ServiceInstance s : registry.getInstances().getAllInstancesOrdered(false)) {
jg.writeStartObject();
jg.writeStringField("identity", s.getWorkerIdentity());
jg.writeStringField("host", s.getHost());
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
index aafc27e..dcb985f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HostAffinitySplitLocationProvider.java
@@ -15,10 +15,13 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.List;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.hash.Hashing;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.split.SplitLocationProvider;
@@ -39,46 +42,83 @@ import org.slf4j.LoggerFactory;
*/
public class HostAffinitySplitLocationProvider implements SplitLocationProvider {
- private final Logger LOG = LoggerFactory.getLogger(HostAffinitySplitLocationProvider.class);
+ private final static Logger LOG = LoggerFactory.getLogger(
+ HostAffinitySplitLocationProvider.class);
private final boolean isDebugEnabled = LOG.isDebugEnabled();
- private final String[] knownLocations;
+ private final List<String> locations;
- public HostAffinitySplitLocationProvider(String[] knownLocations) {
- Preconditions.checkState(knownLocations != null && knownLocations.length != 0,
+ public HostAffinitySplitLocationProvider(List<String> knownLocations) {
+ Preconditions.checkState(knownLocations != null && !knownLocations.isEmpty(),
HostAffinitySplitLocationProvider.class.getName() +
- "needs at least 1 location to function");
- this.knownLocations = knownLocations;
+ " needs at least 1 location to function");
+ this.locations = knownLocations;
}
@Override
public String[] getLocations(InputSplit split) throws IOException {
- if (split instanceof FileSplit) {
- FileSplit fsplit = (FileSplit) split;
- int index = chooseBucket(fsplit.getPath().toString(), fsplit.getStart());
- if (isDebugEnabled) {
- LOG.debug(
- "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart() + ", length=" +
- fsplit.getLength() + " mapped to index=" + index + ", location=" +
- knownLocations[index]);
- }
- return new String[]{knownLocations[index]};
- } else {
+ if (!(split instanceof FileSplit)) {
if (isDebugEnabled) {
LOG.debug("Split: " + split + " is not a FileSplit. Using default locations");
}
return split.getLocations();
}
+ FileSplit fsplit = (FileSplit) split;
+ String splitDesc = "Split at " + fsplit.getPath() + " with offset= " + fsplit.getStart()
+ + ", length=" + fsplit.getLength();
+ String location = locations.get(determineLocation(
+ locations, fsplit.getPath().toString(), fsplit.getStart(), splitDesc));
+ return (location != null) ? new String[] { location } : null;
}
+ @VisibleForTesting
+ public static int determineLocation(
+ List<String> locations, String path, long start, String desc) {
+ byte[] bytes = getHashInputForSplit(path, start);
+ long hash1 = hash1(bytes);
+ int index = Hashing.consistentHash(hash1, locations.size());
+ String location = locations.get(index);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(desc + " mapped to index=" + index + ", location=" + location);
+ }
+ int iter = 1;
+ long hash2 = 0;
+ // Since our probing method is totally bogus, give up after some time.
+ while (location == null && iter < locations.size() * 2) {
+ if (iter == 1) {
+ hash2 = hash2(bytes);
+ }
+ // Note that this is not real double hashing since we have consistent hash on top.
+ index = Hashing.consistentHash(hash1 + iter * hash2, locations.size());
+ location = locations.get(index);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(desc + " remapped to index=" + index + ", location=" + location);
+ }
+ ++iter;
+ }
+ return index;
+ }
- private int chooseBucket(String path, long startOffset) throws IOException {
+ private static byte[] getHashInputForSplit(String path, long start) {
// Explicitly using only the start offset of a split, and not the length. Splits generated on
// block boundaries and stripe boundaries can vary slightly. Try hashing both to the same node.
// There is the drawback of potentially hashing the same data on multiple nodes though, when a
// large split is sent to 1 node, and a second invocation uses smaller chunks of the previous
// large split and send them to different nodes.
- long hashCode = ((startOffset >> 2) * 37) ^ Murmur3.hash64(path.getBytes());
- return Hashing.consistentHash(hashCode, knownLocations.length);
+ byte[] pathBytes = path.getBytes();
+ byte[] allBytes = new byte[pathBytes.length + 8];
+ System.arraycopy(pathBytes, 0, allBytes, 0, pathBytes.length);
+ SerDeUtils.writeLong(allBytes, pathBytes.length, start >> 3);
+ return allBytes;
+ }
+
+ private static long hash1(byte[] bytes) {
+ final int PRIME = 104729; // Same as hash64's default seed.
+ return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
+ }
+
+ private static long hash2(byte[] bytes) {
+ final int PRIME = 1366661;
+ return Murmur3.hash64(bytes, 0, bytes.length, PRIME);
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
index 2e9918e..113aa49 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/Utils.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.exec.tez;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import org.apache.commons.lang.ArrayUtils;
@@ -41,15 +42,14 @@ public class Utils {
serviceRegistry = LlapRegistryService.getClient(conf);
Collection<ServiceInstance> serviceInstances =
- serviceRegistry.getInstances().getAllInstancesOrdered();
- String[] locations = new String[serviceInstances.size()];
- int i = 0;
+ serviceRegistry.getInstances().getAllInstancesOrdered(true);
+ ArrayList<String> locations = new ArrayList<>(serviceInstances.size());
for (ServiceInstance serviceInstance : serviceInstances) {
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + serviceInstance.getWorkerIdentity() + " with hostname=" +
serviceInstance.getHost() + " to list for split locations");
}
- locations[i++] = serviceInstance.getHost();
+ locations.add(serviceInstance.getHost());
}
splitLocationProvider = new HostAffinitySplitLocationProvider(locations);
} else {
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
index 7ed3df1..f5ca623 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestHostAffinitySplitLocationProvider.java
@@ -19,15 +19,22 @@ import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.math.stat.descriptive.SummaryStatistics;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,20 +42,20 @@ public class TestHostAffinitySplitLocationProvider {
private final Logger LOG = LoggerFactory.getLogger(TestHostAffinitySplitLocationProvider.class);
- private static final String[] locations = new String[5];
+ private static final List<String> locations = new ArrayList<>();
private static final Set<String> locationsSet = new HashSet<>();
- private static final String[] executorLocations = new String[9];
+ private static final List<String> executorLocations = new ArrayList<>();
private static final Set<String> executorLocationsSet = new HashSet<>();
static {
for (int i = 0 ; i < 5 ; i++) {
- locations[i] = "location" + i;
- locationsSet.add(locations[i]);
+ locations.add("location" + i);
+ locationsSet.add(locations.get(i));
}
for (int i = 0 ; i < 9 ; i++) {
- executorLocations[i] = "execLocation" + i;
- executorLocationsSet.add(executorLocations[i]);
+ executorLocations.add("execLocation" + i);
+ executorLocationsSet.add(executorLocations.get(i));
}
}
@@ -58,20 +65,20 @@ public class TestHostAffinitySplitLocationProvider {
HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
- InputSplit inputSplit1 = createMockInputSplit(new String[] {locations[0], locations[1]});
- InputSplit inputSplit2 = createMockInputSplit(new String[] {locations[2], locations[3]});
+ InputSplit inputSplit1 = createMockInputSplit(new String[] {locations.get(0), locations.get(1)});
+ InputSplit inputSplit2 = createMockInputSplit(new String[] {locations.get(2), locations.get(3)});
- assertArrayEquals(new String[] {locations[0], locations[1]}, locationProvider.getLocations(inputSplit1));
- assertArrayEquals(new String[] {locations[2], locations[3]}, locationProvider.getLocations(inputSplit2));
+ assertArrayEquals(new String[] {locations.get(0), locations.get(1)}, locationProvider.getLocations(inputSplit1));
+ assertArrayEquals(new String[] {locations.get(2), locations.get(3)}, locationProvider.getLocations(inputSplit2));
}
@Test (timeout = 5000)
public void testOrcSplitsBasic() throws IOException {
HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
- InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations[0], locations[1]});
- InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations[2], locations[3]});
- InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations[0], locations[3]});
+ InputSplit os1 = createMockFileSplit(true, "path1", 0, 1000, new String[] {locations.get(0), locations.get(1)});
+ InputSplit os2 = createMockFileSplit(true, "path2", 0, 2000, new String[] {locations.get(2), locations.get(3)});
+ InputSplit os3 = createMockFileSplit(true, "path3", 1000, 2000, new String[] {locations.get(0), locations.get(3)});
String[] retLoc1 = locationProvider.getLocations(os1);
String[] retLoc2 = locationProvider.getLocations(os2);
@@ -94,25 +101,18 @@ public class TestHostAffinitySplitLocationProvider {
@Test (timeout = 10000)
public void testConsistentHashing() throws IOException {
final int LOC_COUNT = 20, MIN_LOC_COUNT = 4, SPLIT_COUNT = 100;
-
- String[] locations = new String[LOC_COUNT];
- for (int i = 0; i < locations.length; ++i) {
- locations[i] = String.valueOf(i);
- }
- InputSplit[] splits = new InputSplit[SPLIT_COUNT];
- for (int i = 0; i < splits.length; ++i) {
- splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {});
- }
+ List<String> locations = createLocations(LOC_COUNT);
+ InputSplit[] splits = createSplits(SPLIT_COUNT);
StringBuilder failBuilder = new StringBuilder("\n");
String[] lastLocations = new String[splits.length];
double movedRatioSum = 0, newRatioSum = 0,
movedRatioWorst = 0, newRatioWorst = Double.MAX_VALUE;
- for (int locs = MIN_LOC_COUNT; locs <= locations.length; ++locs) {
- String[] partLoc = Arrays.copyOf(locations, locs);
+ for (int locs = MIN_LOC_COUNT; locs <= locations.size(); ++locs) {
+ List<String> partLoc = locations.subList(0, locs);
HostAffinitySplitLocationProvider lp = new HostAffinitySplitLocationProvider(partLoc);
int moved = 0, newLoc = 0;
- String newNode = partLoc[locs - 1];
+ String newNode = partLoc.get(locs - 1);
for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
String[] splitLocations = lp.getLocations(splits[splitIx]);
assertEquals(1, splitLocations.length);
@@ -137,18 +137,106 @@ public class TestHostAffinitySplitLocationProvider {
newRatioWorst = Math.min(newLoc / minNew, newRatioWorst);
logBadRatios(failBuilder, moved, newLoc, msgTail, movedMsg, newMsg, maxMoved, minNew);
}
- int count = locations.length - MIN_LOC_COUNT;
+ int count = locations.size() - MIN_LOC_COUNT;
double moveRatioAvg = movedRatioSum / count, newRatioAvg = newRatioSum / count;
String errorMsg = "Move counts: average " + moveRatioAvg + ", worst " + movedRatioWorst
+ "; assigned to new node: average " + newRatioAvg + ", worst " + newRatioWorst;
LOG.info(errorMsg);
// Give it a LOT of slack, since on low numbers consistent hashing is very imprecise.
if (moveRatioAvg > 1.2f || newRatioAvg < 0.8f
- || movedRatioWorst > 1.5f || newRatioWorst < 0.5f) {
+ || movedRatioWorst > 1.67f || newRatioWorst < 0.5f) {
fail(errorMsg + "; example failures: " + failBuilder.toString());
}
}
+ public FileSplit[] createSplits(final int splitCount) throws IOException {
+ FileSplit[] splits = new FileSplit[splitCount];
+ for (int i = 0; i < splits.length; ++i) {
+ splits[i] = createMockFileSplit(true, "path" + i, 0, 1000, new String[] {});
+ }
+ return splits;
+ }
+
+ public List<String> createLocations(final int locCount) {
+ List<String> locations = new ArrayList<>(locCount);
+ for (int i = 0; i < locCount; ++i) {
+ locations.add(String.valueOf(i));
+ }
+ return locations;
+ }
+
+
+ @Test (timeout = 10000)
+ public void testConsistentHashingFallback() throws IOException {
+ final int LOC_COUNT_TO = 20, SPLIT_COUNT = 500, MAX_MISS_COUNT = 4,
+ LOC_COUNT_FROM = MAX_MISS_COUNT + 1;
+ FileSplit[] splits = createSplits(SPLIT_COUNT);
+ AtomicInteger errorCount = new AtomicInteger(0);
+ int cvErrorCount = 0;
+ for (int locs = LOC_COUNT_FROM; locs <= LOC_COUNT_TO; ++locs) {
+ int aboveAvgCount = 0;
+ double sum = 0;
+ double[] cvs = new double[MAX_MISS_COUNT + 1];
+ for (int missCount = 0; missCount <= MAX_MISS_COUNT; ++missCount) {
+ double cv = cvs[missCount] = testHashDistribution(locs, missCount, splits, errorCount);
+ sum += cv;
+ if (missCount > 0 && cv > sum / (missCount + 1)) {
+ ++aboveAvgCount;
+ }
+ }
+ if (aboveAvgCount > 2) {
+ LOG.info("CVs for " + locs + " locations aren't to our liking: " + Arrays.toString(cvs));
+ ++cvErrorCount;
+ }
+ }
+ assertTrue("Found " + errorCount.get() + " abnormalities", errorCount.get() < 3);
+ // TODO: the way we add hash fns does exhibit some irregularities.
+ // Seems like the 3rd iter has a better distribution in many cases, even better
+ // that the original hash. That trips the "above MA" criteria, even if the rest is flat.
+ assertTrue("Found " + cvErrorCount + " abnormalities", cvErrorCount< 7);
+ }
+
+ private double testHashDistribution(int locs, final int missCount, FileSplit[] splits,
+ AtomicInteger errorCount) {
+ // This relies heavily on what method determineSplits ... calls and doesn't.
+ // We could do a wrapper with only size() and get() methods instead of List, to be sure.
+ @SuppressWarnings("unchecked")
+ List<String> partLocs = (List<String>)Mockito.mock(List.class);
+ Mockito.when(partLocs.size()).thenReturn(locs);
+ final AtomicInteger state = new AtomicInteger(0);
+ Mockito.when(partLocs.get(Mockito.anyInt())).thenAnswer(new Answer<String>() {
+ @Override
+ public String answer(InvocationOnMock invocation) throws Throwable {
+ return (state.getAndIncrement() == missCount) ? "not-null" : null;
+ }
+ });
+ int[] hitCounts = new int[locs];
+ for (int splitIx = 0; splitIx < splits.length; ++splitIx) {
+ state.set(0);
+ int index = HostAffinitySplitLocationProvider.determineLocation(partLocs,
+ splits[splitIx].getPath().toString(), splits[splitIx].getStart(), null);
+ ++hitCounts[index];
+ }
+ SummaryStatistics ss = new SummaryStatistics();
+ for (int hitCount : hitCounts) {
+ ss.addValue(hitCount);
+ }
+ // All of this is completely bogus and mostly captures the following function:
+ // f(output) = I-eyeballed-the(output) == they-look-ok.
+ // It's pretty much a golden file...
+ // The fact that stdev doesn't increase with increasing missCount is captured outside.
+ double avg = ss.getSum()/ss.getN(), stdev = ss.getStandardDeviation(), cv = stdev/avg;
+ double allowedMin = avg - 2.5 * stdev, allowedMax = avg + 2.5 * stdev;
+ if (allowedMin > ss.getMin() || allowedMax < ss.getMax() || cv > 0.22) {
+ LOG.info("The distribution for " + locs + " locations, " + missCount + " misses isn't to "
+ + "our liking: avg " + avg + ", stdev " + stdev + ", cv " + cv + ", min " + ss.getMin()
+ + ", max " + ss.getMax());
+ errorCount.incrementAndGet();
+ }
+ return cv;
+ }
+
+
private void logBadRatios(StringBuilder failBuilder, int moved, int newLoc, String msgTail,
String movedMsg, String newMsg, double maxMoved, double minNew) {
boolean logged = false;
@@ -170,10 +258,10 @@ public class TestHostAffinitySplitLocationProvider {
HostAffinitySplitLocationProvider locationProvider = new HostAffinitySplitLocationProvider(executorLocations);
// Same file, offset, different lengths
- InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations[0], locations[1]});
- InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations[0], locations[1]});
+ InputSplit os11 = createMockFileSplit(true, "path1", 0, 15000, new String[] {locations.get(0), locations.get(1)});
+ InputSplit os12 = createMockFileSplit(true, "path1", 0, 30000, new String[] {locations.get(0), locations.get(1)});
// Same file, different offset
- InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations[0], locations[1]});
+ InputSplit os13 = createMockFileSplit(true, "path1", 15000, 30000, new String[] {locations.get(0), locations.get(1)});
String[] retLoc11 = locationProvider.getLocations(os11);
String[] retLoc12 = locationProvider.getLocations(os12);
@@ -216,7 +304,7 @@ public class TestHostAffinitySplitLocationProvider {
return inputSplit;
}
- private InputSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
+ private FileSplit createMockFileSplit(boolean createOrcSplit, String fakePathString, long start,
long length, String[] locations) throws IOException {
FileSplit fileSplit;
if (createOrcSplit) {
http://git-wip-us.apache.org/repos/asf/hive/blob/83ef6f92/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
----------------------------------------------------------------------
diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
index 6e08dfd..7ffc964 100644
--- a/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
+++ b/serde/src/java/org/apache/hadoop/hive/serde2/SerDeUtils.java
@@ -567,4 +567,15 @@ public final class SerDeUtils {
public static Text transformTextFromUTF8(Text text, Charset targetCharset) {
return new Text(new String(text.getBytes(), 0, text.getLength()).getBytes(targetCharset));
}
+
+ public static void writeLong(byte[] writeBuffer, int offset, long value) {
+ writeBuffer[offset] = (byte) ((value >> 0) & 0xff);
+ writeBuffer[offset + 1] = (byte) ((value >> 8) & 0xff);
+ writeBuffer[offset + 2] = (byte) ((value >> 16) & 0xff);
+ writeBuffer[offset + 3] = (byte) ((value >> 24) & 0xff);
+ writeBuffer[offset + 4] = (byte) ((value >> 32) & 0xff);
+ writeBuffer[offset + 5] = (byte) ((value >> 40) & 0xff);
+ writeBuffer[offset + 6] = (byte) ((value >> 48) & 0xff);
+ writeBuffer[offset + 7] = (byte) ((value >> 56) & 0xff);
+ }
}