You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2017/05/30 18:44:03 UTC

hive git commit: HIVE-16665: Race condition in Utilities.GetInputPathsCallable --> createDummyFileForEmptyPartition (Sahil Takiar, reviewed by Sergio Pena, Vihang Karajgaonkar)

Repository: hive
Updated Branches:
  refs/heads/master 4cd425132 -> 824b9c80b


HIVE-16665: Race condition in Utilities.GetInputPathsCallable --> createDummyFileForEmptyPartition (Sahil Takiar, reviewed by Sergio Pena, Vihang Karajgaonkar)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/824b9c80
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/824b9c80
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/824b9c80

Branch: refs/heads/master
Commit: 824b9c80b443dc4e2b9ad35214a23ac756e75234
Parents: 4cd4251
Author: Sahil Takiar <ta...@gmail.com>
Authored: Tue May 30 13:43:32 2017 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Tue May 30 13:43:32 2017 -0500

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  48 +++++----
 .../hadoop/hive/ql/exec/TestUtilities.java      | 104 +++++++++++++++++--
 2 files changed, 122 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/824b9c80/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index ebf1344..c70e1e0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -3110,22 +3110,30 @@ public final class Utilities {
     }
 
     List<Path> finalPathsToAdd = new LinkedList<>();
-    List<Future<Path>> futures = new LinkedList<>();
+    Map<GetInputPathsCallable, Future<Path>> getPathsCallableToFuture = new LinkedHashMap<>();
     for (final Path path : pathsToAdd) {
-      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+      if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
         throw new IOException("Operation is Canceled. ");
+      }
       if (pool == null) {
-        finalPathsToAdd.add(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call());
+        Path newPath = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy).call();
+        updatePathForMapWork(newPath, work, path);
+        finalPathsToAdd.add(newPath);
       } else {
-        futures.add(pool.submit(new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy)));
+        GetInputPathsCallable callable = new GetInputPathsCallable(path, job, work, hiveScratchDir, ctx, skipDummy);
+        getPathsCallableToFuture.put(callable, pool.submit(callable));
       }
     }
 
     if (pool != null) {
-      for (Future<Path> future : futures) {
-        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT)
+      for (Map.Entry<GetInputPathsCallable, Future<Path>> future : getPathsCallableToFuture.entrySet()) {
+        if (lDrvStat != null && lDrvStat.driverState == DriverState.INTERRUPT) {
           throw new IOException("Operation is Canceled. ");
-        finalPathsToAdd.add(future.get());
+        }
+
+        Path newPath = future.getValue().get();
+        updatePathForMapWork(newPath, work, future.getKey().path);
+        finalPathsToAdd.add(newPath);
       }
     }
 
@@ -3154,7 +3162,8 @@ public final class Utilities {
     @Override
     public Path call() throws Exception {
       if (!this.skipDummy && isEmptyPath(this.job, this.path, this.ctx)) {
-        return createDummyFileForEmptyPartition(this.path, this.job, this.work, this.hiveScratchDir);
+        return createDummyFileForEmptyPartition(this.path, this.job, this.work.getPathToPartitionInfo().get(this.path),
+                this.hiveScratchDir);
       }
       return this.path;
     }
@@ -3192,14 +3201,12 @@ public final class Utilities {
   }
 
   @SuppressWarnings("rawtypes")
-  private static Path createDummyFileForEmptyPartition(Path path, JobConf job, MapWork work,
-      Path hiveScratchDir)
-          throws Exception {
+  private static Path createDummyFileForEmptyPartition(Path path, JobConf job, PartitionDesc partDesc,
+                                                       Path hiveScratchDir) throws Exception {
 
     String strPath = path.toString();
 
     // The input file does not exist, replace it by a empty file
-    PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
     if (partDesc.getTableDesc().isNonNative()) {
       // if this isn't a hive table we can't create an empty file for it.
       return path;
@@ -3216,16 +3223,19 @@ public final class Utilities {
     if (LOG.isInfoEnabled()) {
       LOG.info("Changed input file " + strPath + " to empty file " + newPath + " (" + oneRow + ")");
     }
+    return newPath;
+  }
 
+  private static void updatePathForMapWork(Path newPath, MapWork work, Path path) {
     // update the work
+    if (!newPath.equals(path)) {
+      PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
+      work.addPathToAlias(newPath, work.getPathToAliases().get(path));
+      work.removePathToAlias(path);
 
-    work.addPathToAlias(newPath, work.getPathToAliases().get(path));
-    work.removePathToAlias(path);
-
-    work.removePathToPartitionInfo(path);
-    work.addPathToPartitionInfo(newPath, partDesc);
-
-    return newPath;
+      work.removePathToPartitionInfo(path);
+      work.addPathToPartitionInfo(newPath, partDesc);
+    }
   }
 
   @SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/hive/blob/824b9c80/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 650f169..434e206 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -20,6 +20,10 @@ package org.apache.hadoop.hive.ql.exec;
 
 import static org.apache.hadoop.hive.ql.exec.Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.apache.hadoop.hive.ql.exec.Utilities.getFileExtension;
 import static org.mockito.Mockito.doReturn;
@@ -32,6 +36,7 @@ import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.ExecutorService;
@@ -242,16 +247,17 @@ public class TestUtilities {
 
   /**
    * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)}
-   * can process two different empty tables without throwing any exceptions.
+   * can process two different tables that both have empty partitions.
    */
   @Test
-  public void testGetInputPathsWithEmptyTables() throws Exception {
+  public void testGetInputPathsWithEmptyPartitions() throws Exception {
     String alias1Name = "alias1";
     String alias2Name = "alias2";
 
     MapWork mapWork1 = new MapWork();
     MapWork mapWork2 = new MapWork();
     JobConf jobConf = new JobConf();
+    Configuration conf = new Configuration();
 
     Path nonExistentPath1 = new Path(UUID.randomUUID().toString());
     Path nonExistentPath2 = new Path(UUID.randomUUID().toString());
@@ -269,14 +275,14 @@ public class TestUtilities {
 
     mapWork1.setPathToAliases(new LinkedHashMap<>(
             ImmutableMap.of(nonExistentPath1, Lists.newArrayList(alias1Name))));
-    mapWork1.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>(
+    mapWork1.setAliasToWork(new LinkedHashMap<>(
             ImmutableMap.of(alias1Name, (Operator<?>) mock(Operator.class))));
     mapWork1.setPathToPartitionInfo(new LinkedHashMap<>(
             ImmutableMap.of(nonExistentPath1, mockPartitionDesc)));
 
     mapWork2.setPathToAliases(new LinkedHashMap<>(
             ImmutableMap.of(nonExistentPath2, Lists.newArrayList(alias2Name))));
-    mapWork2.setAliasToWork(new LinkedHashMap<String, Operator<? extends OperatorDesc>>(
+    mapWork2.setAliasToWork(new LinkedHashMap<>(
             ImmutableMap.of(alias2Name, (Operator<?>) mock(Operator.class))));
     mapWork2.setPathToPartitionInfo(new LinkedHashMap<>(
             ImmutableMap.of(nonExistentPath2, mockPartitionDesc)));
@@ -284,11 +290,22 @@ public class TestUtilities {
     List<Path> inputPaths = new ArrayList<>();
     try {
       Path scratchDir = new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR));
-      inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork1, scratchDir,
-              mock(Context.class), false));
-      inputPaths.addAll(Utilities.getInputPaths(jobConf, mapWork2, scratchDir,
-              mock(Context.class), false));
-      assertEquals(inputPaths.size(), 2);
+
+      List<Path> inputPaths1 = Utilities.getInputPaths(jobConf, mapWork1, scratchDir,
+              mock(Context.class), false);
+      inputPaths.addAll(inputPaths1);
+      assertEquals(inputPaths1.size(), 1);
+      assertNotEquals(inputPaths1.get(0), nonExistentPath1);
+      assertTrue(inputPaths1.get(0).getFileSystem(conf).exists(inputPaths1.get(0)));
+      assertFalse(nonExistentPath1.getFileSystem(conf).exists(nonExistentPath1));
+
+      List<Path> inputPaths2 = Utilities.getInputPaths(jobConf, mapWork2, scratchDir,
+              mock(Context.class), false);
+      inputPaths.addAll(inputPaths2);
+      assertEquals(inputPaths2.size(), 1);
+      assertNotEquals(inputPaths2.get(0), nonExistentPath2);
+      assertTrue(inputPaths2.get(0).getFileSystem(conf).exists(inputPaths2.get(0)));
+      assertFalse(nonExistentPath2.getFileSystem(conf).exists(nonExistentPath2));
     } finally {
       File file;
       for (Path path : inputPaths) {
@@ -301,7 +318,72 @@ public class TestUtilities {
   }
 
   /**
-   * Check that calling {@link Utilities#getMaxExecutorsForInputListing(JobConf, int)}
+   * Check that calling {@link Utilities#getInputPaths(JobConf, MapWork, Path, Context, boolean)}
+   * can process two different tables that both have empty partitions when using multiple threads.
+   * Some extra logic is placed at the end of the test to validate no race conditions put the
+   * {@link MapWork} object in an invalid state.
+   */
+  @Test
+  public void testGetInputPathsWithMultipleThreadsAndEmptyPartitions() throws Exception {
+    int numPartitions = 15;
+    JobConf jobConf = new JobConf();
+    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname,
+            Runtime.getRuntime().availableProcessors() * 2);
+    MapWork mapWork = new MapWork();
+    Path testTablePath = new Path("testTable");
+    Path[] testPartitionsPaths = new Path[numPartitions];
+
+    PartitionDesc mockPartitionDesc = mock(PartitionDesc.class);
+    TableDesc mockTableDesc = mock(TableDesc.class);
+
+    when(mockTableDesc.isNonNative()).thenReturn(false);
+    when(mockTableDesc.getProperties()).thenReturn(new Properties());
+    when(mockPartitionDesc.getProperties()).thenReturn(new Properties());
+    when(mockPartitionDesc.getTableDesc()).thenReturn(mockTableDesc);
+    doReturn(HiveSequenceFileOutputFormat.class).when(
+            mockPartitionDesc).getOutputFileFormatClass();
+
+
+    for (int i = 0; i < numPartitions; i++) {
+      String testPartitionName = "p=" + i;
+      testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
+      mapWork.getPathToAliases().put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
+      mapWork.getAliasToWork().put(testPartitionName, (Operator<?>) mock(Operator.class));
+      mapWork.getPathToPartitionInfo().put(testPartitionsPaths[i], mockPartitionDesc);
+
+    }
+
+    FileSystem fs = FileSystem.getLocal(jobConf);
+
+    try {
+      fs.mkdirs(testTablePath);
+      List<Path> inputPaths = Utilities.getInputPaths(jobConf, mapWork,
+              new Path(HiveConf.getVar(jobConf, HiveConf.ConfVars.LOCALSCRATCHDIR)), mock(Context.class), false);
+      assertEquals(inputPaths.size(), numPartitions);
+
+      for (int i = 0; i < numPartitions; i++) {
+        assertNotEquals(inputPaths.get(i), testPartitionsPaths[i]);
+      }
+
+      assertEquals(mapWork.getPathToAliases().size(), numPartitions);
+      assertEquals(mapWork.getPathToPartitionInfo().size(), numPartitions);
+      assertEquals(mapWork.getAliasToWork().size(), numPartitions);
+
+      for (Map.Entry<Path, ArrayList<String>> entry : mapWork.getPathToAliases().entrySet()) {
+        assertNotNull(entry.getKey());
+        assertNotNull(entry.getValue());
+        assertEquals(entry.getValue().size(), 1);
+        assertTrue(entry.getKey().getFileSystem(new Configuration()).exists(entry.getKey()));
+      }
+    } finally {
+      if (fs.exists(testTablePath)) {
+        fs.delete(testTablePath, true);
+      }
+    }
+  }
+
+  /**
+   * Check that calling {@link Utilities#getMaxExecutorsForInputListing(Configuration, int)}
    * returns the maximum number of executors to use based on the number of input locations.
    */
   @Test
@@ -413,7 +495,7 @@ public class TestUtilities {
     Path testTablePath = new Path(testTableName);
     Path[] testPartitionsPaths = new Path[numOfPartitions];
     for (int i=0; i<numOfPartitions; i++) {
-      String testPartitionName = "p=" + 1;
+      String testPartitionName = "p=" + i;
       testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
 
       pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));