You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/02/12 08:36:38 UTC

[hive] branch master updated: HIVE-21071: Improve getInputSummary (BELUGA BEHR via Peter Vary)

This is an automated email from the ASF dual-hosted git repository.

pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 7700359  HIVE-21071: Improve getInputSummary (BELUGA BEHR via Peter Vary)
7700359 is described below

commit 77003590688512f5a62349868b62e4ed66075184
Author: BELUGA BEHR <da...@gmail.com>
AuthorDate: Tue Feb 12 09:35:34 2019 +0100

    HIVE-21071: Improve getInputSummary (BELUGA BEHR via Peter Vary)
---
 .../org/apache/hadoop/hive/ql/exec/Utilities.java  | 145 ++++---
 .../hadoop/hive/ql/exec/TestGetInputSummary.java   | 443 +++++++++++++++++++++
 .../apache/hadoop/hive/ql/exec/TestUtilities.java  | 212 ----------
 3 files changed, 524 insertions(+), 276 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b84b052..61e3430 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -18,12 +18,6 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
 import java.beans.DefaultPersistenceDelegate;
 import java.beans.Encoder;
 import java.beans.Expression;
@@ -53,6 +47,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -65,17 +60,18 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.Deflater;
 import java.util.zip.DeflaterOutputStream;
 import java.util.zip.InflaterInputStream;
 
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
 import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.MapUtils;
@@ -138,7 +134,6 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
 import org.apache.hadoop.hive.ql.io.RCFile;
 import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
 import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
 import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
 import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -211,9 +206,12 @@ import org.apache.hive.common.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 
 /**
@@ -2435,36 +2433,32 @@ public final class Utilities {
     PerfLogger perfLogger = SessionState.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
 
-    long[] summary = {0, 0, 0};
-
+    final long[] summary = {0L, 0L, 0L};
     final Set<Path> pathNeedProcess = new HashSet<>();
 
     // Since multiple threads could call this method concurrently, locking
     // this method will avoid number of threads out of control.
     synchronized (INPUT_SUMMARY_LOCK) {
       // For each input path, calculate the total size.
-      for (Path path : work.getPathToAliases().keySet()) {
-        Path p = path;
-
-        if (filter != null && !filter.accept(p)) {
+      for (final Path path : work.getPathToAliases().keySet()) {
+        if (path == null) {
+          continue;
+        }
+        if (filter != null && !filter.accept(path)) {
           continue;
         }
 
         ContentSummary cs = ctx.getCS(path);
-        if (cs == null) {
-          if (path == null) {
-            continue;
-          }
-          pathNeedProcess.add(path);
-        } else {
+        if (cs != null) {
           summary[0] += cs.getLength();
           summary[1] += cs.getFileCount();
           summary[2] += cs.getDirectoryCount();
+        } else {
+          pathNeedProcess.add(path);
         }
       }
 
       // Process the case when name node call is needed
-      final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
       final ExecutorService executor;
 
       int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size());
@@ -2476,17 +2470,36 @@ public final class Utilities {
       } else {
         executor = null;
       }
-      ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor);
+      getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess),
+          work, summary, executor);
       perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
-      return cs;
     }
+    return new ContentSummary.Builder().length(summary[0])
+        .fileCount(summary[1]).directoryCount(summary[2]).build();
   }
 
+  /**
+   * Performs a ContentSummary lookup over a set of paths using 1 or more
+   * threads. The 'summary' argument is directly modified.
+   *
+   * @param ctx
+   * @param pathNeedProcess
+   * @param work
+   * @param summary
+   * @param executor
+   * @throws IOException
+   */
   @VisibleForTesting
-  static ContentSummary getInputSummaryWithPool(final Context ctx, Set<Path> pathNeedProcess, MapWork work,
-                                                long[] summary, ExecutorService executor) throws IOException {
-    List<Future<?>> results = new ArrayList<Future<?>>();
-    final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
+  static void getInputSummaryWithPool(final Context ctx,
+      final Set<Path> pathNeedProcess, final MapWork work, final long[] summary,
+      final ExecutorService executor) throws IOException {
+    Preconditions.checkNotNull(ctx);
+    Preconditions.checkNotNull(pathNeedProcess);
+
+    List<Future<?>> futures = new ArrayList<Future<?>>(pathNeedProcess.size());
+    final AtomicLong totalLength = new AtomicLong(0L);
+    final AtomicLong totalFileCount = new AtomicLong(0L);
+    final AtomicLong totalDirectoryCount = new AtomicLong(0L);
 
     HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() {
       @Override
@@ -2506,9 +2519,7 @@ public final class Utilities {
     try {
       Configuration conf = ctx.getConf();
       JobConf jobConf = new JobConf(conf);
-      for (Path path : pathNeedProcess) {
-        final Path p = path;
-        final String pathStr = path.toString();
+      for (final Path path : pathNeedProcess) {
         // All threads share the same Configuration and JobConf based on the
         // assumption that they are thread safe if only read operations are
         // executed. It is not stated in Hadoop's javadoc, the sourcce codes
@@ -2519,7 +2530,7 @@ public final class Utilities {
         final JobConf myJobConf = jobConf;
         final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
         final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases();
-        final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p);
+        final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
         Runnable r = new Runnable() {
           @Override
           public void run() {
@@ -2529,11 +2540,11 @@ public final class Utilities {
               InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
                       inputFormatCls, myJobConf);
               if (inputFormatObj instanceof ContentSummaryInputFormat) {
-                ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj;
-                resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
+                ContentSummaryInputFormat csif = (ContentSummaryInputFormat) inputFormatObj;
+                final ContentSummary cs = csif.getContentSummary(path, myJobConf);
+                recordSummary(path, cs);
                 return;
               }
-
               String metaTableStorage = null;
               if (partDesc.getTableDesc() != null &&
                       partDesc.getTableDesc().getProperties() != null) {
@@ -2550,7 +2561,7 @@ public final class Utilities {
                 long total = 0;
                 TableDesc tableDesc = partDesc.getTableDesc();
                 InputEstimator estimator = (InputEstimator) handler;
-                for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
+                for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, path)) {
                   JobConf jobConf = new JobConf(myJobConf);
                   TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias);
                   Utilities.setColumnNameList(jobConf, scanOp, true);
@@ -2559,12 +2570,12 @@ public final class Utilities {
                   Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
                   total += estimator.estimate(jobConf, scanOp, -1).getTotalLength();
                 }
-                resultMap.put(pathStr, new ContentSummary(total, -1, -1));
+                recordSummary(path, new ContentSummary(total, -1, -1));
               } else {
                 // todo: should nullify summary for non-native tables,
                 // not to be selected as a mapjoin target
-                FileSystem fs = p.getFileSystem(myConf);
-                resultMap.put(pathStr, fs.getContentSummary(p));
+                FileSystem fs = path.getFileSystem(myConf);
+                recordSummary(path, fs.getContentSummary(path));
               }
             } catch (Exception e) {
               // We safely ignore this exception for summary data.
@@ -2572,28 +2583,46 @@ public final class Utilities {
               // usages. The worst case is that IOException will always be
               // retried for another getInputSummary(), which is fine as
               // IOException is not considered as a common case.
-              LOG.info("Cannot get size of {}. Safely ignored.", pathStr);
+              LOG.info("Cannot get size of {}. Safely ignored.", path);
+              LOG.debug("Cannot get size of {}. Safely ignored.", path, e);
             }
           }
+
+          private void recordSummary(final Path p, final ContentSummary cs) {
+            final long csLength = cs.getLength();
+            final long csFileCount = cs.getFileCount();
+            final long csDirectoryCount = cs.getDirectoryCount();
+
+            totalLength.addAndGet(csLength);
+            totalFileCount.addAndGet(csFileCount);
+            totalDirectoryCount.addAndGet(csDirectoryCount);
+
+            ctx.addCS(p.toString(), cs);
+
+            LOG.debug(
+                "Cache Content Summary for {} length: {} file count: {} "
+                    + "directory count: {}",
+                path, csLength, csFileCount, csDirectoryCount);
+          }
         };
 
         if (executor == null) {
           r.run();
         } else {
-          Future<?> result = executor.submit(r);
-          results.add(result);
+          Future<?> future = executor.submit(r);
+          futures.add(future);
         }
       }
 
       if (executor != null) {
-        for (Future<?> result : results) {
+        for (Future<?> future : futures) {
           boolean executorDone = false;
           do {
             try {
-              result.get();
+              future.get();
               executorDone = true;
             } catch (InterruptedException e) {
-              LOG.info("Interrupted when waiting threads: ", e);
+              LOG.info("Interrupted when waiting threads", e);
               Thread.currentThread().interrupt();
               break;
             } catch (ExecutionException e) {
@@ -2604,22 +2633,10 @@ public final class Utilities {
         executor.shutdown();
       }
       HiveInterruptUtils.checkInterrupted();
-      for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
-        ContentSummary cs = entry.getValue();
-
-        summary[0] += cs.getLength();
-        summary[1] += cs.getFileCount();
-        summary[2] += cs.getDirectoryCount();
-
-        ctx.addCS(entry.getKey(), cs);
-        if (LOG.isInfoEnabled()) {
-          LOG.info("Cache Content Summary for {} length: {} file count: {} " +
-            " directory count: {}", entry.getKey(), cs.getLength(),
-            cs.getFileCount(), cs.getDirectoryCount());
-        }
-      }
 
-      return new ContentSummary(summary[0], summary[1], summary[2]);
+      summary[0] += totalLength.get();
+      summary[1] += totalFileCount.get();
+      summary[2] += totalDirectoryCount.get();
     } finally {
       if (executor != null) {
         executor.shutdownNow();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java
new file mode 100644
index 0000000..a946b4f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestGetInputSummary {
+
+  private static final String TEST_TABLE_NAME = "testTable";
+  private static final Path TEST_TABLE_PATH = new Path(TEST_TABLE_NAME);
+
+  private JobConf jobConf;
+  private Properties properties;
+
+  @Before
+  public void setup() throws Exception {
+    // creates scratch directories needed by the Context object
+    SessionState.start(new HiveConf());
+
+    this.jobConf = new JobConf();
+    this.properties = new Properties();
+
+    final FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.delete(TEST_TABLE_PATH, true);
+    fs.mkdirs(TEST_TABLE_PATH);
+  }
+
+  @After
+  public void teardown() throws Exception {
+    final FileSystem fs = FileSystem.getLocal(jobConf);
+    fs.delete(TEST_TABLE_PATH, true);
+  }
+
+  @Test
+  public void testGetInputSummaryPoolWithCache() throws Exception {
+    final int BYTES_PER_FILE = 5;
+
+    final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"),
+        new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    ContentSummary cs = new ContentSummary.Builder().directoryCount(10L)
+        .fileCount(10L).length(10L).build();
+
+    Map<Path, ContentSummary> cache = new LinkedHashMap<>();
+    cache.put(new Path("p2"), cs);
+
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties,
+        testPaths, BYTES_PER_FILE, HiveInputFormat.class, cache);
+
+    // The partition paths all contain a single file with 5 bytes of length,
+    // however, one entry was added to the cache which specifies that the
+    // partition has 10 directories and 10 files and these values should
+    // override the real values since the cache is consulted before looking at
+    // the actual file system.
+    final long expectedLength = ((testPaths.size() - 1) * BYTES_PER_FILE) + 10L;
+    final long expectedFileCount = (testPaths.size() - 1) + 10L;
+    final long expectedDirCount = (testPaths.size() - 1) + 10L;
+
+    assertEquals(expectedLength, summary.getLength());
+    assertEquals(expectedFileCount, summary.getFileCount());
+    assertEquals(expectedDirCount, summary.getDirectoryCount());
+  }
+
+  /**
+   * Read several files so that their information is cached, then delete those
+   * files and read them again to see if the results were cached from the first
+   * read. If the cache is not working, the sizes will be off since the files no
+   * longer exist in the file system.
+   *
+   * @throws Exception e
+   */
+  @Test
+  public void testGetInputSummaryPoolWithCacheReuse() throws Exception {
+    final int BYTES_PER_FILE = 5;
+
+    final Collection<Path> testPaths1 = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"));
+    final Collection<Path> testPaths2 =
+        Arrays.asList(new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+    ContentSummary summary =
+        runTestGetInputSummary(jobConf, properties, testPaths1, BYTES_PER_FILE,
+            HiveInputFormat.class, Collections.emptyMap());
+
+    // Ensure the first group of files were read correctly
+    assertEquals(testPaths1.size() * BYTES_PER_FILE, summary.getLength());
+    assertEquals(testPaths1.size(), summary.getFileCount());
+    assertEquals(testPaths1.size(), summary.getDirectoryCount());
+
+    // Delete the files from the first group
+    final FileSystem fs = FileSystem.getLocal(jobConf);
+    for (final Path path : testPaths1) {
+      fs.delete(path, true);
+    }
+
+    // Read all the files and the first group's stats should be pulled from
+    // cache and the second group from the file system
+    summary = runTestGetInputSummary(jobConf, properties,
+        CollectionUtils.union(testPaths1, testPaths2), BYTES_PER_FILE,
+        new HashSet<>(testPaths1), HiveInputFormat.class,
+        Collections.emptyMap());
+
+    assertEquals((testPaths1.size() + testPaths2.size()) * BYTES_PER_FILE,
+        summary.getLength());
+    assertEquals((testPaths1.size() + testPaths2.size()),
+        summary.getFileCount());
+    assertEquals((testPaths1.size() + testPaths2.size()),
+        summary.getDirectoryCount());
+  }
+
+  @Test
+  @SuppressWarnings("deprecation")
+  public void testGetInputSummaryWithMultipleThreads() throws IOException {
+    final int BYTES_PER_FILE = 5;
+
+    final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"),
+        new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+    ContentSummary summary =
+        runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE,
+            HiveInputFormat.class, Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+    assertEquals(testPaths.size(), summary.getFileCount());
+    assertEquals(testPaths.size(), summary.getDirectoryCount());
+
+    // Test deprecated mapred.dfsclient.parallelism.max
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+    jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
+    summary = runTestGetInputSummary(jobConf, properties, testPaths,
+        BYTES_PER_FILE, HiveInputFormat.class, Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+    assertEquals(testPaths.size(), summary.getFileCount());
+    assertEquals(testPaths.size(), summary.getDirectoryCount());
+  }
+
+  @Test
+  public void testGetInputSummaryWithInputEstimator()
+      throws IOException, HiveException {
+    final int BYTES_PER_FILE = 10;
+    final int NUM_OF_ROWS = 5;
+
+    final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"),
+        new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+    properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE,
+        InputEstimatorTestClass.class.getName());
+    InputEstimatorTestClass.setEstimation(
+        new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+    /*
+     * Let's write more bytes to the files to test that Estimator is actually
+     * working returning the file size not from the filesystem
+     */
+    ContentSummary summary =
+        runTestGetInputSummary(jobConf, properties, testPaths,
+            BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+
+    // Current getInputSummary() returns -1 for each file found
+    assertEquals(testPaths.size() * -1, summary.getFileCount());
+
+    // Current getInputSummary() returns -1 for each file found
+    assertEquals(testPaths.size() * -1, summary.getDirectoryCount());
+
+    // Test deprecated mapred.dfsclient.parallelism.max
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+    properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE,
+        InputEstimatorTestClass.class.getName());
+    InputEstimatorTestClass.setEstimation(
+        new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+    /*
+     * Let's write more bytes to the files to test that Estimator is actually
+     * working returning the file size not from the filesystem
+     */
+    summary = runTestGetInputSummary(jobConf, properties, testPaths,
+        BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+
+    // Current getInputSummary() returns -1 for each file found
+    assertEquals(testPaths.size() * -1, summary.getFileCount());
+
+    // Current getInputSummary() returns -1 for each file found
+    assertEquals(testPaths.size() * -1, summary.getDirectoryCount());
+  }
+
+  @Test
+  public void testGetInputSummaryWithASingleThread() throws IOException {
+    final int BYTES_PER_FILE = 5;
+
+    // Set to zero threads to disable thread pool
+    jobConf.setInt(
+        HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+    final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"),
+        new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    ContentSummary summary =
+        runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE,
+            HiveInputFormat.class, Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+    assertEquals(testPaths.size(), summary.getFileCount());
+    assertEquals(testPaths.size(), summary.getDirectoryCount());
+  }
+
+  @Test
+  public void testGetInputSummaryWithContentSummaryInputFormat()
+      throws IOException {
+    final int BYTES_PER_FILE = 10;
+
+    final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+        new Path("p2/test.txt"), new Path("p3/test.txt"),
+        new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+    jobConf.setInt(ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+    ContentSummaryInputFormatTestClass
+        .setContentSummary(new ContentSummary.Builder().length(BYTES_PER_FILE)
+            .fileCount(2).directoryCount(1).build());
+
+    /*
+     * Write more bytes to the files to test that ContentSummaryInputFormat is
+     * actually working returning the file size not from the filesystem
+     */
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties,
+        testPaths, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class,
+        Collections.emptyMap());
+    assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+    assertEquals(testPaths.size() * 2, summary.getFileCount());
+    assertEquals(testPaths.size(), summary.getDirectoryCount());
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetInputSummaryPool()
+      throws ExecutionException, InterruptedException, IOException {
+    ExecutorService pool = mock(ExecutorService.class);
+    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+    Set<Path> pathNeedProcess = new HashSet<>();
+    pathNeedProcess.add(new Path("dummy-path1"));
+    pathNeedProcess.add(new Path("dummy-path2"));
+    pathNeedProcess.add(new Path("dummy-path3"));
+
+    Context context = new Context(jobConf);
+
+    Utilities.getInputSummaryWithPool(context, pathNeedProcess,
+        mock(MapWork.class), new long[3], pool);
+    verify(pool, times(3)).submit(any(Runnable.class));
+    verify(pool).shutdown();
+    verify(pool).shutdownNow();
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGetInputSummaryPoolAndFailure()
+      throws ExecutionException, InterruptedException, IOException {
+    ExecutorService pool = mock(ExecutorService.class);
+    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+    Set<Path> pathNeedProcess = new HashSet<>();
+    pathNeedProcess.add(new Path("dummy-path1"));
+    pathNeedProcess.add(new Path("dummy-path2"));
+    pathNeedProcess.add(new Path("dummy-path3"));
+
+    Context context = new Context(jobConf);
+
+    Utilities.getInputSummaryWithPool(context, pathNeedProcess,
+        mock(MapWork.class), new long[3], pool);
+    verify(pool, times(3)).submit(any(Runnable.class));
+    verify(pool).shutdown();
+    verify(pool).shutdownNow();
+  }
+
+  @SuppressWarnings("rawtypes")
+  private ContentSummary runTestGetInputSummary(JobConf jobConf,
+      Properties properties, Collection<Path> testPaths, int bytesPerFile,
+      Class<? extends InputFormat> inputFormatClass,
+      Map<Path, ContentSummary> cache) throws IOException {
+    return runTestGetInputSummary(jobConf, properties, testPaths, bytesPerFile,
+        Collections.emptyList(), inputFormatClass, cache);
+  }
+
+  @SuppressWarnings("rawtypes")
+  private ContentSummary runTestGetInputSummary(JobConf jobConf,
+      Properties properties, Collection<Path> testPaths, int bytesPerFile,
+      Collection<Path> providedPaths,
+      Class<? extends InputFormat> inputFormatClass,
+      Map<Path, ContentSummary> cache) throws IOException {
+
+    final FileSystem fs = FileSystem.getLocal(jobConf);
+
+    MapWork mapWork = new MapWork();
+    Context context = new Context(jobConf);
+
+    for (Map.Entry<Path, ContentSummary> entry : cache.entrySet()) {
+      final Path partitionPath = new Path(TEST_TABLE_PATH, entry.getKey());
+      context.addCS(partitionPath.toString(), entry.getValue());
+    }
+
+    LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo =
+        new LinkedHashMap<>();
+    LinkedHashMap<Path, ArrayList<String>> pathToAliasTable =
+        new LinkedHashMap<>();
+    TableScanOperator scanOp = new TableScanOperator();
+
+    PartitionDesc partitionDesc = new PartitionDesc(
+        new TableDesc(inputFormatClass, null, properties), null);
+
+    for (final Path path : testPaths) {
+      final Path fullPath = new Path(TEST_TABLE_PATH, path);
+      final Path partitionPath = fullPath.getParent();
+
+      // If it is not provided by the test case, create a dummy file
+      if (!providedPaths.contains(path)) {
+        final byte[] data = new byte[bytesPerFile];
+
+        fs.mkdirs(partitionPath);
+
+        FSDataOutputStream out = fs.create(fullPath);
+        out.write(data);
+        out.close();
+      }
+
+      pathToPartitionInfo.put(partitionPath, partitionDesc);
+      pathToAliasTable.put(partitionPath,
+          Lists.newArrayList(partitionPath.getName()));
+      mapWork.getAliasToWork().put(partitionPath.getName(), scanOp);
+    }
+
+    mapWork.setPathToAliases(pathToAliasTable);
+    mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+
+    return Utilities.getInputSummary(context, mapWork, null);
+  }
+
+  @SuppressWarnings("rawtypes")
+  static class ContentSummaryInputFormatTestClass extends FileInputFormat
+      implements ContentSummaryInputFormat {
+    private static ContentSummary summary =
+        new ContentSummary.Builder().build();
+
+    public static void setContentSummary(ContentSummary contentSummary) {
+      summary = contentSummary;
+    }
+
+    @Override
+    public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf,
+        Reporter reporter) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ContentSummary getContentSummary(Path p, JobConf job)
+        throws IOException {
+      return summary;
+    }
+  }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 90eb45b..305b467 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -37,12 +37,10 @@ import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
@@ -50,20 +48,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.type.Timestamp;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
 import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
 import org.apache.hadoop.hive.ql.exec.tez.TezTask;
 import org.apache.hadoop.hive.ql.io.*;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -73,18 +67,12 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -540,46 +528,6 @@ public class TestUtilities {
   }
 
   @Test
-  public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException {
-    ExecutorService pool = mock(ExecutorService.class);
-    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
-
-    Set<Path> pathNeedProcess = new HashSet<>();
-    pathNeedProcess.add(new Path("dummy-path1"));
-    pathNeedProcess.add(new Path("dummy-path2"));
-    pathNeedProcess.add(new Path("dummy-path3"));
-
-    SessionState.start(new HiveConf());
-    JobConf jobConf = new JobConf();
-    Context context = new Context(jobConf);
-
-    Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool);
-    verify(pool, times(3)).submit(any(Runnable.class));
-    verify(pool).shutdown();
-    verify(pool).shutdownNow();
-  }
-
-  @Test
-  public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException {
-    ExecutorService pool = mock(ExecutorService.class);
-    when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
-
-    Set<Path> pathNeedProcess = new HashSet<>();
-    pathNeedProcess.add(new Path("dummy-path1"));
-    pathNeedProcess.add(new Path("dummy-path2"));
-    pathNeedProcess.add(new Path("dummy-path3"));
-
-    SessionState.start(new HiveConf());
-    JobConf jobConf = new JobConf();
-    Context context = new Context(jobConf);
-
-    Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool);
-    verify(pool, times(3)).submit(any(Runnable.class));
-    verify(pool).shutdown();
-    verify(pool).shutdownNow();
-  }
-
-  @Test
   public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException {
     List<Path> pathsToAdd = new ArrayList<>();
     Path path = new Path("dummy-path");
@@ -630,166 +578,6 @@ public class TestUtilities {
     verify(pool).shutdownNow();
   }
 
-  @Test
-  public void testGetInputSummaryWithASingleThread() throws IOException {
-    final int NUM_PARTITIONS = 5;
-    final int BYTES_PER_FILE = 5;
-
-    JobConf jobConf = new JobConf();
-    Properties properties = new Properties();
-
-    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
-    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS, summary.getFileCount());
-    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
-  }
-
-  @Test
-  public void testGetInputSummaryWithMultipleThreads() throws IOException {
-    final int NUM_PARTITIONS = 5;
-    final int BYTES_PER_FILE = 5;
-
-    JobConf jobConf = new JobConf();
-    Properties properties = new Properties();
-
-    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
-    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS, summary.getFileCount());
-    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
-
-    // Test deprecated mapred.dfsclient.parallelism.max
-    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
-    jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
-    summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS, summary.getFileCount());
-    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
-  }
-
-  @Test
-  public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException {
-    final int NUM_PARTITIONS = 5;
-    final int BYTES_PER_FILE = 10;
-    final int NUM_OF_ROWS = 5;
-
-    JobConf jobConf = new JobConf();
-    Properties properties = new Properties();
-
-    jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
-
-    properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
-    InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
-
-    /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
-    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS * -1, summary.getFileCount());        // Current getInputSummary() returns -1 for each file found
-    assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount());   // Current getInputSummary() returns -1 for each file found
-
-    // Test deprecated mapred.dfsclient.parallelism.max
-    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
-    jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
-
-    properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
-    InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
-
-    /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
-    summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS * -1, summary.getFileCount());        // Current getInputSummary() returns -1 for each file found
-    assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount());   // Current getInputSummary() returns -1 for each file found
-  }
-
-  static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat {
-    private static ContentSummary summary = new ContentSummary.Builder().build();
-
-    public static void setContentSummary(ContentSummary contentSummary) {
-      summary = contentSummary;
-    }
-
-    @Override
-    public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
-      return null;
-    }
-
-    @Override
-    public ContentSummary getContentSummary(Path p, JobConf job) throws IOException {
-      return summary;
-    }
-  }
-
-  @Test
-  public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException {
-    final int NUM_PARTITIONS = 5;
-    final int BYTES_PER_FILE = 10;
-
-    JobConf jobConf = new JobConf();
-    Properties properties = new Properties();
-
-    jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
-
-    ContentSummaryInputFormatTestClass.setContentSummary(
-        new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build());
-
-    /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */
-    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class);
-    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
-    assertEquals(NUM_PARTITIONS * 2, summary.getFileCount());
-    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
-  }
-
-  private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class<? extends InputFormat> inputFormatClass) throws IOException {
-    // creates scratch directories needed by the Context object
-    SessionState.start(new HiveConf());
-
-    MapWork mapWork = new MapWork();
-    Context context = new Context(jobConf);
-    LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>();
-    LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>();
-    TableScanOperator scanOp = new TableScanOperator();
-
-    PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null);
-
-    String testTableName = "testTable";
-
-    Path testTablePath = new Path(testTableName);
-    Path[] testPartitionsPaths = new Path[numOfPartitions];
-    for (int i=0; i<numOfPartitions; i++) {
-      String testPartitionName = "p=" + 1;
-      testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
-
-      pathToPartitionInfo.put(testPartitionsPaths[i], partitionDesc);
-
-      pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
-
-      mapWork.getAliasToWork().put(testPartitionName, scanOp);
-    }
-
-    mapWork.setPathToAliases(pathToAliasTable);
-    mapWork.setPathToPartitionInfo(pathToPartitionInfo);
-
-    FileSystem fs = FileSystem.getLocal(jobConf);
-    try {
-      fs.mkdirs(testTablePath);
-      byte[] data = new byte[bytesPerFile];
-
-      for (int i=0; i<numOfPartitions; i++) {
-        fs.mkdirs(testPartitionsPaths[i]);
-        FSDataOutputStream out = fs.create(new Path(testPartitionsPaths[i], "test1.txt"));
-        out.write(data);
-        out.close();
-      }
-
-      return Utilities.getInputSummary(context, mapWork, null);
-    } finally {
-      if (fs.exists(testTablePath)) {
-        fs.delete(testTablePath, true);
-      }
-    }
-  }
-
   private Task<? extends Serializable> getDependencyCollectionTask(){
     return TaskFactory.get(new DependencyCollectionWork());
   }