You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/05/30 18:44:03 UTC
hive git commit: HIVE-16665: Race condition in
Utilities.GetInputPathsCallable --> createDummyFileForEmptyPartition (Sahil
Takiar, reviewed by Sergio Pena, Vihang Karajgaonkar)
Repository: hive
Updated Branches:
refs/heads/master 4cd425132 -> 824b9c80b
HIVE-16665: Race condition in Utilities.GetInputPathsCallable --> createDummyFileForEmptyPartition (Sahil Takiar, reviewed by Sergio Pena, Vihang Karajgaonkar)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/824b9c80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/824b9c80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/824b9c80
Branch: refs/heads/master
Commit: 824b9c80b443dc4e2b9ad35214a23ac756e75234
Parents: 4cd4251
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue May 30 13:43:32 2017 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Tue May 30 13:43:32 2017 -0500
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/Utilities.java | 48 +++++----
.../hadoop/hive/ql/exec/TestUtilities.java | 104 +++++++++++++++++--
2 files changed, 122 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/824b9c80/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ebf1344..c70e1e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3110,22 +3110,30 @@ public final class Utilities {
}
List<Path> finalPathsToAdd = new LinkedList<>();
- List<Future<Path>> futures = new LinkedList<>();
+ Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>();
for (final Path path : pathsToAdd) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
throw new IOException("Operation is Canceled. ");
+ }
if (pool == null) {
- finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
+ Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call();
+ updatePathForMapWork(newPath, work, path);
+ finalPathsToAdd.add(newPath);
} else {
- futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy)));
+ GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy);
+ getPathsCallableToFuture.put(callable, pool.submit(callable));
}
}
if (pool != null) {
- for (Future<Path> future : futures) {
- if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+ for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) {
+ if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
throw new IOException("Operation is Canceled. ");
- finalPathsToAdd.add(future.get());
+ }
+
+ Path newPath = future.getValue().get();
+ updatePathForMapWork(newPath, work, future.getKey().path);
+ finalPathsToAdd.add(newPath);
}
}
@@ -3154,7 +3162,8 @@ public final class Utilities {
@Override
public Path call() throws Exception {
if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) {
- return createDummyFileForEmptyPartition(this.path, this.job, this.work, this.hiveScratchDir);
+ return createDummyFileForEmptyPartition(this.path, this.job, this.work.getPathToPartitionInfo().get(this.path),
+ this.hiveScratchDir);
}
return this.path;
}
@@ -3192,14 +3201,12 @@ public final class Utilities {
}
@SuppressWarnings("rawtypes")
- private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
- Path hiveScratchDir)
- throws Exception {
+ private static Path createDummyFileForEmptyPartition(Path path, JobConf job, PartitionDesc partDesc,
+ Path hiveScratchDir) throws Exception {
String strPath = path.toString();
// The input file does not exist, replace it by a empty file
- PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
if (partDesc.getTableDesc().isNonNative()) {
// if this isn't a hive table we can't create an empty file for it.
return path;
@@ -3216,16 +3223,19 @@ public final class Utilities {
if (LOG.isInfoEnabled()) {
LOG.info("Changed input file " + strPath + " to empty file " + newPath + " (" + oneRow + ")");
}
+ return newPath;
+ }
+ private static void updatePathForMapWork(Path newPath, MapWork work, Path path) {
// update the work
+ if (!newPath.equals(path)) {
+ PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
+ work.addPathToAlias(newPath, work.getPathToAliases().get(path));
+ work.removePathToAlias(path);
- work.addPathToAlias(newPath, work.getPathToAliases().get(path));
- work.removePathToAlias(path);
-
- work.removePathToPartitionInfo(path);
- work.addPathToPartitionInfo(newPath, partDesc);
-
- return newPath;
+ work.removePathToPartitionInfo(path);
+ work.addPathToPartitionInfo(newPath, partDesc);
+ }
}
@SuppressWarnings("rawtypes")
http://git-wip-us.apache.org/repos/asf/hive/blob/824b9c80/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 650f169..434e206 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.hive.ql.exec;
import static org.apache.hadoop.hive.ql.exec.Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension;
import static org.mockito.Mockito.doReturn;
@@ -32,6 +36,7 @@ import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
@@ -242,16 +247,17 @@ public class TestUtilities {
/**
* Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)}
- * can process two different empty tables without throwing any exceptions.
+ * can process two different tables that both have empty partitions.
*/
@Test
- public void testGetInputPathsWithEmptyTables() throws Exception {
+ public void testGetInputPathsWithEmptyPartitions() throws Exception {
String alias1Name = "alias1";
String alias2Name = "alias2";
MapWork mapWork1 = new MapWork();
MapWork mapWork2 = new MapWork();
JobConf jobConf = new JobConf();
+ Configuration conf = new Configuration();
Path nonExistentPath1 = new Path(UUID.randomUUID().toString());
Path nonExistentPath2 = new Path(UUID.randomUUID().toString());
@@ -269,14 +275,14 @@ public class TestUtilities {
mapWork1.setPathToAliases(new LinkedHashMap<>(
ImmutableMap.of(nonExistentPath1, Lists.newArrayList(alias1Name))));
- mapWork1.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>(
+ mapWork1.setAliasToWork(new LinkedHashMap<>(
ImmutableMap.of(alias1Name, (Operator<?>) mock(Operator.class))));
mapWork1.setPathToPartitionInfo(new LinkedHashMap<>(
ImmutableMap.of(nonExistentPath1, mockPartitionDesc)));
mapWork2.setPathToAliases(new LinkedHashMap<>(
ImmutableMap.of(nonExistentPath2, Lists.newArrayList(alias2Name))));
- mapWork2.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>(
+ mapWork2.setAliasToWork(new LinkedHashMap<>(
ImmutableMap.of(alias2Name, (Operator<?>) mock(Operator.class))));
mapWork2.setPathToPartitionInfo(new LinkedHashMap<>(
ImmutableMap.of(nonExistentPath2, mockPartitionDesc)));
@@ -284,11 +290,22 @@ public class TestUtilities {
List<Path> inputPaths = new ArrayList<>();
try {
Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR));
- inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork1, scratchDir,
- mock(Context.class), false));
- inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork2, scratchDir,
- mock(Context.class), false));
- assertEquals(inputPaths.size(), 2);
+
+ List<Path> inputPaths1 = Utilities.getInputPaths(jobConf, mapWork1, scratchDir,
+ mock(Context.class), false);
+ inputPaths.addAll(inputPaths1);
+ assertEquals(inputPaths1.size(), 1);
+ assertNotEquals(inputPaths1.get(0), nonExistentPath1);
+ assertTrue(inputPaths1.get(0).getFileSystem(conf).exists(inputPaths1.get(0)));
+ assertFalse(nonExistentPath1.getFileSystem(conf).exists(nonExistentPath1));
+
+ List<Path> inputPaths2 = Utilities.getInputPaths(jobConf, mapWork2, scratchDir,
+ mock(Context.class), false);
+ inputPaths.addAll(inputPaths2);
+ assertEquals(inputPaths2.size(), 1);
+ assertNotEquals(inputPaths2.get(0), nonExistentPath2);
+ assertTrue(inputPaths2.get(0).getFileSystem(conf).exists(inputPaths2.get(0)));
+ assertFalse(nonExistentPath2.getFileSystem(conf).exists(nonExistentPath2));
} finally {
File file;
for (Path path : inputPaths) {
@@ -301,7 +318,72 @@ public class TestUtilities {
}
/**
- * Check that calling {@link Utilities#getMaxExecutorsForInputListing(JobConf, int)}
+ * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)}
+ * can process two different tables that both have empty partitions when using multiple threads.
+ * Some extra logic is placed at the end of the test to validate no race conditions put the
+ * {@link MapWork} object in an invalid state.
+ */
+ @Test
+ public void testGetInputPathsWithMultipleThreadsAndEmptyPartitions() throws Exception {
+ int numPartitions = 15;
+ JobConf jobConf = new JobConf();
+ jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname,
+ Runtime.getRuntime().availableProcessors() * 2);
+ MapWork mapWork = new MapWork();
+ Path testTablePath = new Path("testTable");
+ Path[] testPartitionsPaths = new Path[numPartitions];
+
+ PartitionDesc mockPartitionDesc = mock(PartitionDesc.class);
+ TableDesc mockTableDesc = mock(TableDesc.class);
+
+ when(mockTableDesc.isNonNative()).thenReturn(false);
+ when(mockTableDesc.getProperties()).thenReturn(new Properties());
+ when(mockPartitionDesc.getProperties()).thenReturn(new Properties());
+ when(mockPartitionDesc.getTableDesc()).thenReturn(mockTableDesc);
+ doReturn(HiveSequenceFileOutputFormat.class).when(
+ mockPartitionDesc).getOutputFileFormatClass();
+
+
+ for (int i = 0; i < numPartitions; i++) {
+ String testPartitionName = "p=" + i;
+ testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
+ mapWork.getPathToAliases().put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
+ mapWork.getAliasToWork().put(testPartitionName, (Operator<?>) mock(Operator.class));
+ mapWork.getPathToPartitionInfo().put(testPartitionsPaths[i], mockPartitionDesc);
+
+ }
+
+ FileSystem fs = FileSystem.getLocal(jobConf);
+
+ try {
+ fs.mkdirs(testTablePath);
+ List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork,
+ new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)), mock(Context.class), false);
+ assertEquals(inputPaths.size(), numPartitions);
+
+ for (int i = 0; i < numPartitions; i++) {
+ assertNotEquals(inputPaths.get(i), testPartitionsPaths[i]);
+ }
+
+ assertEquals(mapWork.getPathToAliases().size(), numPartitions);
+ assertEquals(mapWork.getPathToPartitionInfo().size(), numPartitions);
+ assertEquals(mapWork.getAliasToWork().size(), numPartitions);
+
+ for (Map.Entry<Path, ArrayList<String>> entry : mapWork.getPathToAliases().entrySet()) {
+ assertNotNull(entry.getKey());
+ assertNotNull(entry.getValue());
+ assertEquals(entry.getValue().size(), 1);
+ assertTrue(entry.getKey().getFileSystem(new Configuration()).exists(entry.getKey()));
+ }
+ } finally {
+ if (fs.exists(testTablePath)) {
+ fs.delete(testTablePath, true);
+ }
+ }
+ }
+
+ /**
+ * Check that calling {@link Utilities#getMaxExecutorsForInputListing(Configuration, int)}
* returns the maximum number of executors to use based on the number of input locations.
*/
@Test
@@ -413,7 +495,7 @@ public class TestUtilities {
Path testTablePath = new Path(testTableName);
Path[] testPartitionsPaths = new Path[numOfPartitions];
for (int i=0; i<numOfPartitions; i++) {
- String testPartitionName = "p=" + 1;
+ String testPartitionName = "p=" + i;
testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));