You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2020/04/13 23:06:18 UTC
[hive] branch master updated: HIVE-23029: LLAP: Shuffle Handler
should support Index Cache configuration (Rajesh Balamohan,
reviewed by Ashutosh Chauhan)
This is an automated email from the ASF dual-hosted git repository.
rbalamohan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d6948a2 HIVE-23029: LLAP: Shuffle Handler should support Index Cache configuration (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
d6948a2 is described below
commit d6948a28ab3e34e5116591a60a96bdf031185e47
Author: Rajesh Balamohan <rb...@apache.org>
AuthorDate: Tue Apr 14 04:36:05 2020 +0530
HIVE-23029: LLAP: Shuffle Handler should support Index Cache configuration (Rajesh Balamohan, reviewed by Ashutosh Chauhan)
---
.../hive/llap/shufflehandler/IndexCache.java | 3 +-
.../hive/llap/shufflehandler/TestIndexCache.java | 336 +++++++++++++++++++++
2 files changed, 338 insertions(+), 1 deletion(-)
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
index 4de03f2..fb1bcfe 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/IndexCache.java
@@ -42,10 +42,11 @@ class IndexCache {
private final LinkedBlockingQueue<String> queue =
new LinkedBlockingQueue<String>();
+ public static final String INDEX_CACHE_MB = "llap.shuffle.indexcache.mb";
public IndexCache(Configuration conf) {
this.conf = conf;
- totalMemoryAllowed = 10 * 1024 * 1024;
+ totalMemoryAllowed = conf.getInt(INDEX_CACHE_MB, 10) * 1024 * 1024;
LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
}
diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/shufflehandler/TestIndexCache.java b/llap-server/src/test/org/apache/hadoop/hive/llap/shufflehandler/TestIndexCache.java
new file mode 100644
index 0000000..851e9c0
--- /dev/null
+++ b/llap-server/src/test/org/apache/hadoop/hive/llap/shufflehandler/TestIndexCache.java
@@ -0,0 +1,336 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.llap.shufflehandler;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Random;
+import java.util.zip.CRC32;
+import java.util.zip.CheckedOutputStream;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import org.apache.tez.runtime.library.common.Constants;
+import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.hive.llap.shufflehandler.IndexCache.INDEX_CACHE_MB;
+
+import static org.junit.Assert.*;
+
+public class TestIndexCache {
+ private Configuration conf;
+ private FileSystem fs;
+ private Path p;
+
+ @Before
+ public void setUp() throws IOException {
+ conf = new Configuration();
+ fs = FileSystem.getLocal(conf).getRaw();
+ p = new Path(System.getProperty("test.build.data", "/tmp"),
+ "cache").makeQualified(fs.getUri(), fs.getWorkingDirectory());
+ }
+
+ @Test
+ public void testLRCPolicy() throws Exception {
+ Random r = new Random();
+ long seed = r.nextLong();
+ r.setSeed(seed);
+ System.out.println("seed: " + seed);
+ fs.delete(p, true);
+ conf.setInt(INDEX_CACHE_MB, 1);
+ final int partsPerMap = 1000;
+ final int bytesPerFile = partsPerMap * 24;
+ IndexCache cache = new IndexCache(conf);
+
+ // fill cache
+ int totalsize = bytesPerFile;
+ for (; totalsize < 1024 * 1024; totalsize += bytesPerFile) {
+ Path f = new Path(p, Integer.toString(totalsize, 36));
+ writeFile(fs, f, totalsize, partsPerMap);
+ TezIndexRecord rec = cache.getIndexInformation(
+ Integer.toString(totalsize, 36), r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ checkRecord(rec, totalsize);
+ }
+
+ // delete files, ensure cache retains all elem
+ for (FileStatus stat : fs.listStatus(p)) {
+ fs.delete(stat.getPath(),true);
+ }
+ for (int i = bytesPerFile; i < 1024 * 1024; i += bytesPerFile) {
+ Path f = new Path(p, Integer.toString(i, 36));
+ TezIndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ checkRecord(rec, i);
+ }
+
+ // push oldest (bytesPerFile) out of cache
+ Path f = new Path(p, Integer.toString(totalsize, 36));
+ writeFile(fs, f, totalsize, partsPerMap);
+ cache.getIndexInformation(Integer.toString(totalsize, 36),
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ fs.delete(f, false);
+
+ // oldest fails to read, or error
+ boolean fnf = false;
+ try {
+ cache.getIndexInformation(Integer.toString(bytesPerFile, 36),
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(bytesPerFile)),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ } catch (IOException e) {
+ if (e.getCause() == null ||
+ !(e.getCause() instanceof FileNotFoundException)) {
+ throw e;
+ }
+ else {
+ fnf = true;
+ }
+ }
+ if (!fnf)
+ fail("Failed to push out last entry");
+ // should find all the other entries
+ for (int i = bytesPerFile << 1; i < 1024 * 1024; i += bytesPerFile) {
+ TezIndexRecord rec = cache.getIndexInformation(Integer.toString(i, 36),
+ r.nextInt(partsPerMap), new Path(p, Integer.toString(i, 36)),
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ checkRecord(rec, i);
+ }
+ TezIndexRecord rec = cache.getIndexInformation(Integer.toString(totalsize, 36),
+ r.nextInt(partsPerMap), f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+
+ checkRecord(rec, totalsize);
+ }
+
+ @Test
+ public void testBadIndex() throws Exception {
+ final int parts = 30;
+ fs.delete(p, true);
+ conf.setInt(INDEX_CACHE_MB, 1);
+ IndexCache cache = new IndexCache(conf);
+
+ Path f = new Path(p, "badindex");
+ FSDataOutputStream out = fs.create(f, false);
+ CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
+ DataOutputStream dout = new DataOutputStream(iout);
+ for (int i = 0; i < parts; ++i) {
+ for (int j = 0; j < Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+ if (0 == (i % 3)) {
+ dout.writeLong(i);
+ } else {
+ out.writeLong(i);
+ }
+ }
+ }
+ out.writeLong(iout.getChecksum().getValue());
+ dout.close();
+ try {
+ cache.getIndexInformation("badindex", 7, f,
+ UserGroupInformation.getCurrentUser().getShortUserName());
+ fail("Did not detect bad checksum");
+ } catch (IOException e) {
+ if (!(e.getCause() instanceof ChecksumException)) {
+ throw e;
+ }
+ }
+ }
+
+ @Test
+ public void testInvalidReduceNumberOrLength() throws Exception {
+ fs.delete(p, true);
+ conf.setInt(INDEX_CACHE_MB, 1);
+ final int partsPerMap = 1000;
+ final int bytesPerFile = partsPerMap * 24;
+ IndexCache cache = new IndexCache(conf);
+
+ // fill cache
+ Path feq = new Path(p, "invalidReduceOrPartsPerMap");
+ writeFile(fs, feq, bytesPerFile, partsPerMap);
+
+ // Number of reducers should always be less than partsPerMap as reducer
+ // numbers start from 0 and there cannot be more reducer than parts
+
+ try {
+ // Number of reducers equal to partsPerMap
+ cache.getIndexInformation("reduceEqualPartsPerMap",
+ partsPerMap, // reduce number == partsPerMap
+ feq, UserGroupInformation.getCurrentUser().getShortUserName());
+ fail("Number of reducers equal to partsPerMap did not fail");
+ } catch (Exception e) {
+ if (!(e instanceof IOException)) {
+ throw e;
+ }
+ }
+
+ try {
+ // Number of reducers more than partsPerMap
+ cache.getIndexInformation(
+ "reduceMorePartsPerMap",
+ partsPerMap + 1, // reduce number > partsPerMap
+ feq, UserGroupInformation.getCurrentUser().getShortUserName());
+ fail("Number of reducers more than partsPerMap did not fail");
+ } catch (Exception e) {
+ if (!(e instanceof IOException)) {
+ throw e;
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveMap() throws Exception {
+ // This test case use two thread to call getIndexInformation and
+ // removeMap concurrently, in order to construct race condition.
+ // This test case may not repeatable. But on my macbook this test
+ // fails with probability of 100% on code before MAPREDUCE-2541,
+ // so it is repeatable in practice.
+ fs.delete(p, true);
+ conf.setInt(INDEX_CACHE_MB, 10);
+ // Make a big file so removeMapThread almost surely runs faster than
+ // getInfoThread
+ final int partsPerMap = 100000;
+ final int bytesPerFile = partsPerMap * 24;
+ final IndexCache cache = new IndexCache(conf);
+
+ final Path big = new Path(p, "bigIndex");
+ final String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ writeFile(fs, big, bytesPerFile, partsPerMap);
+
+ // run multiple times
+ for (int i = 0; i < 20; ++i) {
+ Thread getInfoThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ cache.getIndexInformation("bigIndex", partsPerMap, big, user);
+ } catch (Exception e) {
+ // should not be here
+ }
+ }
+ };
+ Thread removeMapThread = new Thread() {
+ @Override
+ public void run() {
+ cache.removeMap("bigIndex");
+ }
+ };
+ if (i%2==0) {
+ getInfoThread.start();
+ removeMapThread.start();
+ } else {
+ removeMapThread.start();
+ getInfoThread.start();
+ }
+ getInfoThread.join();
+ removeMapThread.join();
+ assertEquals(true, cache.checkTotalMemoryUsed());
+ }
+ }
+
+ @Test
+ public void testCreateRace() throws Exception {
+ fs.delete(p, true);
+ conf.setInt(INDEX_CACHE_MB, 1);
+ final int partsPerMap = 1000;
+ final int bytesPerFile = partsPerMap * 24;
+ final IndexCache cache = new IndexCache(conf);
+
+ final Path racy = new Path(p, "racyIndex");
+ final String user =
+ UserGroupInformation.getCurrentUser().getShortUserName();
+ writeFile(fs, racy, bytesPerFile, partsPerMap);
+
+ // run multiple instances
+ Thread[] getInfoThreads = new Thread[50];
+ for (int i = 0; i < 50; i++) {
+ getInfoThreads[i] = new Thread() {
+ @Override
+ public void run() {
+ try {
+ cache.getIndexInformation("racyIndex", partsPerMap, racy, user);
+ cache.removeMap("racyIndex");
+ } catch (Exception e) {
+ // should not be here
+ }
+ }
+ };
+ }
+
+ for (int i = 0; i < 50; i++) {
+ getInfoThreads[i].start();
+ }
+
+ final Thread mainTestThread = Thread.currentThread();
+
+ Thread timeoutThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ Thread.sleep(15000);
+ mainTestThread.interrupt();
+ } catch (InterruptedException ie) {
+ // we are done;
+ }
+ }
+ };
+
+ for (int i = 0; i < 50; i++) {
+ try {
+ getInfoThreads[i].join();
+ } catch (InterruptedException ie) {
+ // we haven't finished in time. Potential deadlock/race.
+ fail("Unexpectedly long delay during concurrent cache entry creations");
+ }
+ }
+ // stop the timeoutThread. If we get interrupted before stopping, there
+ // must be something wrong, although it wasn't a deadlock. No need to
+ // catch and swallow.
+ timeoutThread.interrupt();
+ }
+
+ private static void checkRecord(TezIndexRecord rec, long fill) {
+ assertEquals(fill, rec.getStartOffset());
+ assertEquals(fill, rec.getRawLength());
+ assertEquals(fill, rec.getPartLength());
+ }
+
+ private static void writeFile(FileSystem fs, Path f, long fill, int parts)
+ throws IOException {
+ FSDataOutputStream out = fs.create(f, false);
+ CheckedOutputStream iout = new CheckedOutputStream(out, new CRC32());
+ DataOutputStream dout = new DataOutputStream(iout);
+ for (int i = 0; i < parts; ++i) {
+ for (int j = 0; j < Constants.MAP_OUTPUT_INDEX_RECORD_LENGTH / 8; ++j) {
+ dout.writeLong(fill);
+ }
+ }
+ out.writeLong(iout.getChecksum().getValue());
+ dout.close();
+ }
+}