You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pv...@apache.org on 2019/02/12 08:36:38 UTC
[hive] branch master updated: HIVE-21071: Improve getInputSummary
(BELUGA BEHR via Peter Vary)
This is an automated email from the ASF dual-hosted git repository.
pvary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 7700359 HIVE-21071: Improve getInputSummary (BELUGA BEHR via Peter Vary)
7700359 is described below
commit 77003590688512f5a62349868b62e4ed66075184
Author: BELUGA BEHR <da...@gmail.com>
AuthorDate: Tue Feb 12 09:35:34 2019 +0100
HIVE-21071: Improve getInputSummary (BELUGA BEHR via Peter Vary)
---
.../org/apache/hadoop/hive/ql/exec/Utilities.java | 145 ++++---
.../hadoop/hive/ql/exec/TestGetInputSummary.java | 443 +++++++++++++++++++++
.../apache/hadoop/hive/ql/exec/TestUtilities.java | 212 ----------
3 files changed, 524 insertions(+), 276 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index b84b052..61e3430 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -18,12 +18,6 @@
package org.apache.hadoop.hive.ql.exec;
-import com.esotericsoftware.kryo.Kryo;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
import java.beans.DefaultPersistenceDelegate;
import java.beans.Encoder;
import java.beans.Expression;
@@ -53,6 +47,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
+import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -65,17 +60,18 @@ import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.InflaterInputStream;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
@@ -138,7 +134,6 @@ import org.apache.hadoop.hive.ql.io.OneNullRowInputFormat;
import org.apache.hadoop.hive.ql.io.RCFile;
import org.apache.hadoop.hive.ql.io.ReworkMapredInputFormat;
import org.apache.hadoop.hive.ql.io.SelfDescribingInputFormatInterface;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
import org.apache.hadoop.hive.ql.io.merge.MergeFileMapper;
import org.apache.hadoop.hive.ql.io.merge.MergeFileWork;
import org.apache.hadoop.hive.ql.io.rcfile.truncate.ColumnTruncateMapper;
@@ -211,9 +206,12 @@ import org.apache.hive.common.util.ReflectionUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
@@ -2435,36 +2433,32 @@ public final class Utilities {
PerfLogger perfLogger = SessionState.getPerfLogger();
perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
- long[] summary = {0, 0, 0};
-
+ final long[] summary = {0L, 0L, 0L};
final Set<Path> pathNeedProcess = new HashSet<>();
// Since multiple threads could call this method concurrently, locking
// this method will avoid number of threads out of control.
synchronized (INPUT_SUMMARY_LOCK) {
// For each input path, calculate the total size.
- for (Path path : work.getPathToAliases().keySet()) {
- Path p = path;
-
- if (filter != null && !filter.accept(p)) {
+ for (final Path path : work.getPathToAliases().keySet()) {
+ if (path == null) {
+ continue;
+ }
+ if (filter != null && !filter.accept(path)) {
continue;
}
ContentSummary cs = ctx.getCS(path);
- if (cs == null) {
- if (path == null) {
- continue;
- }
- pathNeedProcess.add(path);
- } else {
+ if (cs != null) {
summary[0] += cs.getLength();
summary[1] += cs.getFileCount();
summary[2] += cs.getDirectoryCount();
+ } else {
+ pathNeedProcess.add(path);
}
}
// Process the case when name node call is needed
- final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
final ExecutorService executor;
int numExecutors = getMaxExecutorsForInputListing(ctx.getConf(), pathNeedProcess.size());
@@ -2476,17 +2470,36 @@ public final class Utilities {
} else {
executor = null;
}
- ContentSummary cs = getInputSummaryWithPool(ctx, pathNeedProcess, work, summary, executor);
+ getInputSummaryWithPool(ctx, Collections.unmodifiableSet(pathNeedProcess),
+ work, summary, executor);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.INPUT_SUMMARY);
- return cs;
}
+ return new ContentSummary.Builder().length(summary[0])
+ .fileCount(summary[1]).directoryCount(summary[2]).build();
}
+ /**
+ * Performs a ContentSummary lookup over a set of paths using 1 or more
+ * threads. The 'summary' argument is directly modified.
+ *
+ * @param ctx
+ * @param pathNeedProcess
+ * @param work
+ * @param summary
+ * @param executor
+ * @throws IOException
+ */
@VisibleForTesting
- static ContentSummary getInputSummaryWithPool(final Context ctx, Set<Path> pathNeedProcess, MapWork work,
- long[] summary, ExecutorService executor) throws IOException {
- List<Future<?>> results = new ArrayList<Future<?>>();
- final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
+ static void getInputSummaryWithPool(final Context ctx,
+ final Set<Path> pathNeedProcess, final MapWork work, final long[] summary,
+ final ExecutorService executor) throws IOException {
+ Preconditions.checkNotNull(ctx);
+ Preconditions.checkNotNull(pathNeedProcess);
+
+ List<Future<?>> futures = new ArrayList<Future<?>>(pathNeedProcess.size());
+ final AtomicLong totalLength = new AtomicLong(0L);
+ final AtomicLong totalFileCount = new AtomicLong(0L);
+ final AtomicLong totalDirectoryCount = new AtomicLong(0L);
HiveInterruptCallback interrup = HiveInterruptUtils.add(new HiveInterruptCallback() {
@Override
@@ -2506,9 +2519,7 @@ public final class Utilities {
try {
Configuration conf = ctx.getConf();
JobConf jobConf = new JobConf(conf);
- for (Path path : pathNeedProcess) {
- final Path p = path;
- final String pathStr = path.toString();
+ for (final Path path : pathNeedProcess) {
// All threads share the same Configuration and JobConf based on the
// assumption that they are thread safe if only read operations are
// executed. It is not stated in Hadoop's javadoc, the sourcce codes
@@ -2519,7 +2530,7 @@ public final class Utilities {
final JobConf myJobConf = jobConf;
final Map<String, Operator<?>> aliasToWork = work.getAliasToWork();
final Map<Path, ArrayList<String>> pathToAlias = work.getPathToAliases();
- final PartitionDesc partDesc = work.getPathToPartitionInfo().get(p);
+ final PartitionDesc partDesc = work.getPathToPartitionInfo().get(path);
Runnable r = new Runnable() {
@Override
public void run() {
@@ -2529,11 +2540,11 @@ public final class Utilities {
InputFormat inputFormatObj = HiveInputFormat.getInputFormatFromCache(
inputFormatCls, myJobConf);
if (inputFormatObj instanceof ContentSummaryInputFormat) {
- ContentSummaryInputFormat cs = (ContentSummaryInputFormat) inputFormatObj;
- resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
+ ContentSummaryInputFormat csif = (ContentSummaryInputFormat) inputFormatObj;
+ final ContentSummary cs = csif.getContentSummary(path, myJobConf);
+ recordSummary(path, cs);
return;
}
-
String metaTableStorage = null;
if (partDesc.getTableDesc() != null &&
partDesc.getTableDesc().getProperties() != null) {
@@ -2550,7 +2561,7 @@ public final class Utilities {
long total = 0;
TableDesc tableDesc = partDesc.getTableDesc();
InputEstimator estimator = (InputEstimator) handler;
- for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, p)) {
+ for (String alias : HiveFileFormatUtils.doGetAliasesFromPath(pathToAlias, path)) {
JobConf jobConf = new JobConf(myJobConf);
TableScanOperator scanOp = (TableScanOperator) aliasToWork.get(alias);
Utilities.setColumnNameList(jobConf, scanOp, true);
@@ -2559,12 +2570,12 @@ public final class Utilities {
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
total += estimator.estimate(jobConf, scanOp, -1).getTotalLength();
}
- resultMap.put(pathStr, new ContentSummary(total, -1, -1));
+ recordSummary(path, new ContentSummary(total, -1, -1));
} else {
// todo: should nullify summary for non-native tables,
// not to be selected as a mapjoin target
- FileSystem fs = p.getFileSystem(myConf);
- resultMap.put(pathStr, fs.getContentSummary(p));
+ FileSystem fs = path.getFileSystem(myConf);
+ recordSummary(path, fs.getContentSummary(path));
}
} catch (Exception e) {
// We safely ignore this exception for summary data.
@@ -2572,28 +2583,46 @@ public final class Utilities {
// usages. The worst case is that IOException will always be
// retried for another getInputSummary(), which is fine as
// IOException is not considered as a common case.
- LOG.info("Cannot get size of {}. Safely ignored.", pathStr);
+ LOG.info("Cannot get size of {}. Safely ignored.", path);
+ LOG.debug("Cannot get size of {}. Safely ignored.", path, e);
}
}
+
+ private void recordSummary(final Path p, final ContentSummary cs) {
+ final long csLength = cs.getLength();
+ final long csFileCount = cs.getFileCount();
+ final long csDirectoryCount = cs.getDirectoryCount();
+
+ totalLength.addAndGet(csLength);
+ totalFileCount.addAndGet(csFileCount);
+ totalDirectoryCount.addAndGet(csDirectoryCount);
+
+ ctx.addCS(p.toString(), cs);
+
+ LOG.debug(
+ "Cache Content Summary for {} length: {} file count: {} "
+ + "directory count: {}",
+ path, csLength, csFileCount, csDirectoryCount);
+ }
};
if (executor == null) {
r.run();
} else {
- Future<?> result = executor.submit(r);
- results.add(result);
+ Future<?> future = executor.submit(r);
+ futures.add(future);
}
}
if (executor != null) {
- for (Future<?> result : results) {
+ for (Future<?> future : futures) {
boolean executorDone = false;
do {
try {
- result.get();
+ future.get();
executorDone = true;
} catch (InterruptedException e) {
- LOG.info("Interrupted when waiting threads: ", e);
+ LOG.info("Interrupted when waiting threads", e);
Thread.currentThread().interrupt();
break;
} catch (ExecutionException e) {
@@ -2604,22 +2633,10 @@ public final class Utilities {
executor.shutdown();
}
HiveInterruptUtils.checkInterrupted();
- for (Map.Entry<String, ContentSummary> entry : resultMap.entrySet()) {
- ContentSummary cs = entry.getValue();
-
- summary[0] += cs.getLength();
- summary[1] += cs.getFileCount();
- summary[2] += cs.getDirectoryCount();
-
- ctx.addCS(entry.getKey(), cs);
- if (LOG.isInfoEnabled()) {
- LOG.info("Cache Content Summary for {} length: {} file count: {} " +
- " directory count: {}", entry.getKey(), cs.getLength(),
- cs.getFileCount(), cs.getDirectoryCount());
- }
- }
- return new ContentSummary(summary[0], summary[1], summary[2]);
+ summary[0] += totalLength.get();
+ summary[1] += totalFileCount.get();
+ summary[2] += totalDirectoryCount.get();
} finally {
if (executor != null) {
executor.shutdownNow();
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java
new file mode 100644
index 0000000..a946b4f
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestGetInputSummary.java
@@ -0,0 +1,443 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.io.ContentSummaryInputFormat;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestGetInputSummary {
+
+ private static final String TEST_TABLE_NAME = "testTable";
+ private static final Path TEST_TABLE_PATH = new Path(TEST_TABLE_NAME);
+
+ private JobConf jobConf;
+ private Properties properties;
+
+ @Before
+ public void setup() throws Exception {
+ // creates scratch directories needed by the Context object
+ SessionState.start(new HiveConf());
+
+ this.jobConf = new JobConf();
+ this.properties = new Properties();
+
+ final FileSystem fs = FileSystem.getLocal(jobConf);
+ fs.delete(TEST_TABLE_PATH, true);
+ fs.mkdirs(TEST_TABLE_PATH);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ final FileSystem fs = FileSystem.getLocal(jobConf);
+ fs.delete(TEST_TABLE_PATH, true);
+ }
+
+ @Test
+ public void testGetInputSummaryPoolWithCache() throws Exception {
+ final int BYTES_PER_FILE = 5;
+
+ final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"),
+ new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ ContentSummary cs = new ContentSummary.Builder().directoryCount(10L)
+ .fileCount(10L).length(10L).build();
+
+ Map<Path, ContentSummary> cache = new LinkedHashMap<>();
+ cache.put(new Path("p2"), cs);
+
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties,
+ testPaths, BYTES_PER_FILE, HiveInputFormat.class, cache);
+
+ // The partition paths all contain a single file with 5 bytes of length,
+ // however, one entry was added to the cache which specifies that the
+ // partition has 10 directories and 10 files and these values should
+ // override the real values since the cache is consulted before looking at
+ // the actual file system.
+ final long expectedLength = ((testPaths.size() - 1) * BYTES_PER_FILE) + 10L;
+ final long expectedFileCount = (testPaths.size() - 1) + 10L;
+ final long expectedDirCount = (testPaths.size() - 1) + 10L;
+
+ assertEquals(expectedLength, summary.getLength());
+ assertEquals(expectedFileCount, summary.getFileCount());
+ assertEquals(expectedDirCount, summary.getDirectoryCount());
+ }
+
+ /**
+ * Read several files so that their information is cached, then delete those
+ * files and read them again to see if the results were cached from the first
+ * read. If the cache is not working, the sizes will be off since the files no
+ * longer exist in the file system.
+ *
+ * @throws Exception e
+ */
+ @Test
+ public void testGetInputSummaryPoolWithCacheReuse() throws Exception {
+ final int BYTES_PER_FILE = 5;
+
+ final Collection<Path> testPaths1 = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"));
+ final Collection<Path> testPaths2 =
+ Arrays.asList(new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+ ContentSummary summary =
+ runTestGetInputSummary(jobConf, properties, testPaths1, BYTES_PER_FILE,
+ HiveInputFormat.class, Collections.emptyMap());
+
+ // Ensure the first group of files were read correctly
+ assertEquals(testPaths1.size() * BYTES_PER_FILE, summary.getLength());
+ assertEquals(testPaths1.size(), summary.getFileCount());
+ assertEquals(testPaths1.size(), summary.getDirectoryCount());
+
+ // Delete the files from the first group
+ final FileSystem fs = FileSystem.getLocal(jobConf);
+ for (final Path path : testPaths1) {
+ fs.delete(path, true);
+ }
+
+ // Read all the files and the first group's stats should be pulled from
+ // cache and the second group from the file system
+ summary = runTestGetInputSummary(jobConf, properties,
+ CollectionUtils.union(testPaths1, testPaths2), BYTES_PER_FILE,
+ new HashSet<>(testPaths1), HiveInputFormat.class,
+ Collections.emptyMap());
+
+ assertEquals((testPaths1.size() + testPaths2.size()) * BYTES_PER_FILE,
+ summary.getLength());
+ assertEquals((testPaths1.size() + testPaths2.size()),
+ summary.getFileCount());
+ assertEquals((testPaths1.size() + testPaths2.size()),
+ summary.getDirectoryCount());
+ }
+
+ @Test
+ @SuppressWarnings("deprecation")
+ public void testGetInputSummaryWithMultipleThreads() throws IOException {
+ final int BYTES_PER_FILE = 5;
+
+ final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"),
+ new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+ ContentSummary summary =
+ runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE,
+ HiveInputFormat.class, Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+ assertEquals(testPaths.size(), summary.getFileCount());
+ assertEquals(testPaths.size(), summary.getDirectoryCount());
+
+ // Test deprecated mapred.dfsclient.parallelism.max
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+ jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
+ summary = runTestGetInputSummary(jobConf, properties, testPaths,
+ BYTES_PER_FILE, HiveInputFormat.class, Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+ assertEquals(testPaths.size(), summary.getFileCount());
+ assertEquals(testPaths.size(), summary.getDirectoryCount());
+ }
+
+ @Test
+ public void testGetInputSummaryWithInputEstimator()
+ throws IOException, HiveException {
+ final int BYTES_PER_FILE = 10;
+ final int NUM_OF_ROWS = 5;
+
+ final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"),
+ new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+ properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE,
+ InputEstimatorTestClass.class.getName());
+ InputEstimatorTestClass.setEstimation(
+ new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+ /*
+ * Let's write more bytes to the files to test that Estimator is actually
+ * working returning the file size not from the filesystem
+ */
+ ContentSummary summary =
+ runTestGetInputSummary(jobConf, properties, testPaths,
+ BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+
+ // Current getInputSummary() returns -1 for each file found
+ assertEquals(testPaths.size() * -1, summary.getFileCount());
+
+ // Current getInputSummary() returns -1 for each file found
+ assertEquals(testPaths.size() * -1, summary.getDirectoryCount());
+
+ // Test deprecated mapred.dfsclient.parallelism.max
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+ properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE,
+ InputEstimatorTestClass.class.getName());
+ InputEstimatorTestClass.setEstimation(
+ new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+ /*
+ * Let's write more bytes to the files to test that Estimator is actually
+ * working returning the file size not from the filesystem
+ */
+ summary = runTestGetInputSummary(jobConf, properties, testPaths,
+ BYTES_PER_FILE * 2, HiveInputFormat.class, Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+
+ // Current getInputSummary() returns -1 for each file found
+ assertEquals(testPaths.size() * -1, summary.getFileCount());
+
+ // Current getInputSummary() returns -1 for each file found
+ assertEquals(testPaths.size() * -1, summary.getDirectoryCount());
+ }
+
+ @Test
+ public void testGetInputSummaryWithASingleThread() throws IOException {
+ final int BYTES_PER_FILE = 5;
+
+ // Set to zero threads to disable thread pool
+ jobConf.setInt(
+ HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
+
+ final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"),
+ new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ ContentSummary summary =
+ runTestGetInputSummary(jobConf, properties, testPaths, BYTES_PER_FILE,
+ HiveInputFormat.class, Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+ assertEquals(testPaths.size(), summary.getFileCount());
+ assertEquals(testPaths.size(), summary.getDirectoryCount());
+ }
+
+ @Test
+ public void testGetInputSummaryWithContentSummaryInputFormat()
+ throws IOException {
+ final int BYTES_PER_FILE = 10;
+
+ final Collection<Path> testPaths = Arrays.asList(new Path("p1/test.txt"),
+ new Path("p2/test.txt"), new Path("p3/test.txt"),
+ new Path("p4/test.txt"), new Path("p5/test.txt"));
+
+ jobConf.setInt(ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
+
+ ContentSummaryInputFormatTestClass
+ .setContentSummary(new ContentSummary.Builder().length(BYTES_PER_FILE)
+ .fileCount(2).directoryCount(1).build());
+
+ /*
+ * Write more bytes to the files to test that ContentSummaryInputFormat is
+ * actually working returning the file size not from the filesystem
+ */
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties,
+ testPaths, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class,
+ Collections.emptyMap());
+ assertEquals(testPaths.size() * BYTES_PER_FILE, summary.getLength());
+ assertEquals(testPaths.size() * 2, summary.getFileCount());
+ assertEquals(testPaths.size(), summary.getDirectoryCount());
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGetInputSummaryPool()
+ throws ExecutionException, InterruptedException, IOException {
+ ExecutorService pool = mock(ExecutorService.class);
+ when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+ Set<Path> pathNeedProcess = new HashSet<>();
+ pathNeedProcess.add(new Path("dummy-path1"));
+ pathNeedProcess.add(new Path("dummy-path2"));
+ pathNeedProcess.add(new Path("dummy-path3"));
+
+ Context context = new Context(jobConf);
+
+ Utilities.getInputSummaryWithPool(context, pathNeedProcess,
+ mock(MapWork.class), new long[3], pool);
+ verify(pool, times(3)).submit(any(Runnable.class));
+ verify(pool).shutdown();
+ verify(pool).shutdownNow();
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testGetInputSummaryPoolAndFailure()
+ throws ExecutionException, InterruptedException, IOException {
+ ExecutorService pool = mock(ExecutorService.class);
+ when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
+
+ Set<Path> pathNeedProcess = new HashSet<>();
+ pathNeedProcess.add(new Path("dummy-path1"));
+ pathNeedProcess.add(new Path("dummy-path2"));
+ pathNeedProcess.add(new Path("dummy-path3"));
+
+ Context context = new Context(jobConf);
+
+ Utilities.getInputSummaryWithPool(context, pathNeedProcess,
+ mock(MapWork.class), new long[3], pool);
+ verify(pool, times(3)).submit(any(Runnable.class));
+ verify(pool).shutdown();
+ verify(pool).shutdownNow();
+ }
+
+ @SuppressWarnings("rawtypes")
+ private ContentSummary runTestGetInputSummary(JobConf jobConf,
+ Properties properties, Collection<Path> testPaths, int bytesPerFile,
+ Class<? extends InputFormat> inputFormatClass,
+ Map<Path, ContentSummary> cache) throws IOException {
+ return runTestGetInputSummary(jobConf, properties, testPaths, bytesPerFile,
+ Collections.emptyList(), inputFormatClass, cache);
+ }
+
+ @SuppressWarnings("rawtypes")
+ private ContentSummary runTestGetInputSummary(JobConf jobConf,
+ Properties properties, Collection<Path> testPaths, int bytesPerFile,
+ Collection<Path> providedPaths,
+ Class<? extends InputFormat> inputFormatClass,
+ Map<Path, ContentSummary> cache) throws IOException {
+
+ final FileSystem fs = FileSystem.getLocal(jobConf);
+
+ MapWork mapWork = new MapWork();
+ Context context = new Context(jobConf);
+
+ for (Map.Entry<Path, ContentSummary> entry : cache.entrySet()) {
+ final Path partitionPath = new Path(TEST_TABLE_PATH, entry.getKey());
+ context.addCS(partitionPath.toString(), entry.getValue());
+ }
+
+ LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo =
+ new LinkedHashMap<>();
+ LinkedHashMap<Path, ArrayList<String>> pathToAliasTable =
+ new LinkedHashMap<>();
+ TableScanOperator scanOp = new TableScanOperator();
+
+ PartitionDesc partitionDesc = new PartitionDesc(
+ new TableDesc(inputFormatClass, null, properties), null);
+
+ for (final Path path : testPaths) {
+ final Path fullPath = new Path(TEST_TABLE_PATH, path);
+ final Path partitionPath = fullPath.getParent();
+
+ // If it is not provided by the test case, create a dummy file
+ if (!providedPaths.contains(path)) {
+ final byte[] data = new byte[bytesPerFile];
+
+ fs.mkdirs(partitionPath);
+
+ FSDataOutputStream out = fs.create(fullPath);
+ out.write(data);
+ out.close();
+ }
+
+ pathToPartitionInfo.put(partitionPath, partitionDesc);
+ pathToAliasTable.put(partitionPath,
+ Lists.newArrayList(partitionPath.getName()));
+ mapWork.getAliasToWork().put(partitionPath.getName(), scanOp);
+ }
+
+ mapWork.setPathToAliases(pathToAliasTable);
+ mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+
+ return Utilities.getInputSummary(context, mapWork, null);
+ }
+
+ @SuppressWarnings("rawtypes")
+ static class ContentSummaryInputFormatTestClass extends FileInputFormat
+ implements ContentSummaryInputFormat {
+ private static ContentSummary summary =
+ new ContentSummary.Builder().build();
+
+ public static void setContentSummary(ContentSummary contentSummary) {
+ summary = contentSummary;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf,
+ Reporter reporter) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path p, JobConf job)
+ throws IOException {
+ return summary;
+ }
+ }
+}
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index 90eb45b..305b467 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -37,12 +37,10 @@ import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Set;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@@ -50,20 +48,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.Timestamp;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.exec.mr.ExecDriver;
import org.apache.hadoop.hive.ql.exec.spark.SparkTask;
import org.apache.hadoop.hive.ql.exec.tez.TezTask;
import org.apache.hadoop.hive.ql.io.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.DependencyCollectionWork;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
@@ -73,18 +67,12 @@ import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.InputFormat;
-import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.mapred.RecordReader;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -540,46 +528,6 @@ public class TestUtilities {
}
@Test
- public void testGetInputSummaryPool() throws ExecutionException, InterruptedException, IOException {
- ExecutorService pool = mock(ExecutorService.class);
- when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
-
- Set<Path> pathNeedProcess = new HashSet<>();
- pathNeedProcess.add(new Path("dummy-path1"));
- pathNeedProcess.add(new Path("dummy-path2"));
- pathNeedProcess.add(new Path("dummy-path3"));
-
- SessionState.start(new HiveConf());
- JobConf jobConf = new JobConf();
- Context context = new Context(jobConf);
-
- Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool);
- verify(pool, times(3)).submit(any(Runnable.class));
- verify(pool).shutdown();
- verify(pool).shutdownNow();
- }
-
- @Test
- public void testGetInputSummaryPoolAndFailure() throws ExecutionException, InterruptedException, IOException {
- ExecutorService pool = mock(ExecutorService.class);
- when(pool.submit(any(Runnable.class))).thenReturn(mock(Future.class));
-
- Set<Path> pathNeedProcess = new HashSet<>();
- pathNeedProcess.add(new Path("dummy-path1"));
- pathNeedProcess.add(new Path("dummy-path2"));
- pathNeedProcess.add(new Path("dummy-path3"));
-
- SessionState.start(new HiveConf());
- JobConf jobConf = new JobConf();
- Context context = new Context(jobConf);
-
- Utilities.getInputSummaryWithPool(context, pathNeedProcess, mock(MapWork.class), new long[3], pool);
- verify(pool, times(3)).submit(any(Runnable.class));
- verify(pool).shutdown();
- verify(pool).shutdownNow();
- }
-
- @Test
public void testGetInputPathsPool() throws IOException, ExecutionException, InterruptedException {
List<Path> pathsToAdd = new ArrayList<>();
Path path = new Path("dummy-path");
@@ -630,166 +578,6 @@ public class TestUtilities {
verify(pool).shutdownNow();
}
- @Test
- public void testGetInputSummaryWithASingleThread() throws IOException {
- final int NUM_PARTITIONS = 5;
- final int BYTES_PER_FILE = 5;
-
- JobConf jobConf = new JobConf();
- Properties properties = new Properties();
-
- jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
- ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS, summary.getFileCount());
- assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
- }
-
- @Test
- public void testGetInputSummaryWithMultipleThreads() throws IOException {
- final int NUM_PARTITIONS = 5;
- final int BYTES_PER_FILE = 5;
-
- JobConf jobConf = new JobConf();
- Properties properties = new Properties();
-
- jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
- ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS, summary.getFileCount());
- assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
-
- // Test deprecated mapred.dfsclient.parallelism.max
- jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
- jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
- summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS, summary.getFileCount());
- assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
- }
-
- @Test
- public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException {
- final int NUM_PARTITIONS = 5;
- final int BYTES_PER_FILE = 10;
- final int NUM_OF_ROWS = 5;
-
- JobConf jobConf = new JobConf();
- Properties properties = new Properties();
-
- jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
-
- properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
- InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
-
- /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
- ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found
- assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found
-
- // Test deprecated mapred.dfsclient.parallelism.max
- jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 0);
- jobConf.setInt(HiveConf.ConfVars.HIVE_EXEC_INPUT_LISTING_MAX_THREADS.varname, 2);
-
- properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
- InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
-
- /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
- summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found
- assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found
- }
-
- static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat {
- private static ContentSummary summary = new ContentSummary.Builder().build();
-
- public static void setContentSummary(ContentSummary contentSummary) {
- summary = contentSummary;
- }
-
- @Override
- public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
- return null;
- }
-
- @Override
- public ContentSummary getContentSummary(Path p, JobConf job) throws IOException {
- return summary;
- }
- }
-
- @Test
- public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException {
- final int NUM_PARTITIONS = 5;
- final int BYTES_PER_FILE = 10;
-
- JobConf jobConf = new JobConf();
- Properties properties = new Properties();
-
- jobConf.setInt(Utilities.DEPRECATED_MAPRED_DFSCLIENT_PARALLELISM_MAX, 2);
-
- ContentSummaryInputFormatTestClass.setContentSummary(
- new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build());
-
- /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */
- ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class);
- assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
- assertEquals(NUM_PARTITIONS * 2, summary.getFileCount());
- assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
- }
-
- private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class<? extends InputFormat> inputFormatClass) throws IOException {
- // creates scratch directories needed by the Context object
- SessionState.start(new HiveConf());
-
- MapWork mapWork = new MapWork();
- Context context = new Context(jobConf);
- LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>();
- LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>();
- TableScanOperator scanOp = new TableScanOperator();
-
- PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null);
-
- String testTableName = "testTable";
-
- Path testTablePath = new Path(testTableName);
- Path[] testPartitionsPaths = new Path[numOfPartitions];
- for (int i=0; i<numOfPartitions; i++) {
- String testPartitionName = "p=" + 1;
- testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
-
- pathToPartitionInfo.put(testPartitionsPaths[i], partitionDesc);
-
- pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
-
- mapWork.getAliasToWork().put(testPartitionName, scanOp);
- }
-
- mapWork.setPathToAliases(pathToAliasTable);
- mapWork.setPathToPartitionInfo(pathToPartitionInfo);
-
- FileSystem fs = FileSystem.getLocal(jobConf);
- try {
- fs.mkdirs(testTablePath);
- byte[] data = new byte[bytesPerFile];
-
- for (int i=0; i<numOfPartitions; i++) {
- fs.mkdirs(testPartitionsPaths[i]);
- FSDataOutputStream out = fs.create(new Path(testPartitionsPaths[i], "test1.txt"));
- out.write(data);
- out.close();
- }
-
- return Utilities.getInputSummary(context, mapWork, null);
- } finally {
- if (fs.exists(testTablePath)) {
- fs.delete(testTablePath, true);
- }
- }
- }
-
private Task<? extends Serializable> getDependencyCollectionTask(){
return TaskFactory.get(new DependencyCollectionWork());
}