You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:30 UTC
[31/50] [abbrv] hive git commit: HIVE-15736: Add unit tests to
Utilities.getInputSummary() method for multi-threading cases (Sergio Pena,
reviewed by Chaoyu Tang)
HIVE-15736: Add unit tests to Utilities.getInputSummary() method for multi-threading cases (Sergio Pena, reviewed by Chaoyu Tang)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0c3d5da7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0c3d5da7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0c3d5da7
Branch: refs/heads/hive-14535
Commit: 0c3d5da71d221a0a9792118bd780f38149822a89
Parents: 4becd68
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Feb 1 09:53:14 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Wed Feb 1 09:53:14 2017 -0600
----------------------------------------------------------------------
.../apache/hadoop/hive/ql/exec/Utilities.java | 42 +++---
.../hive/ql/exec/InputEstimatorTestClass.java | 106 +++++++++++++
.../hadoop/hive/ql/exec/TestUtilities.java | 151 ++++++++++++++++++-
3 files changed, 279 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 68dd5e7..12a03d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -176,7 +176,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
-import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
@@ -195,9 +194,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.zip.Deflater;
@@ -2106,7 +2102,7 @@ public final class Utilities {
long[] summary = {0, 0, 0};
- final List<Path> pathNeedProcess = new ArrayList<>();
+ final Set<Path> pathNeedProcess = new HashSet<>();
// Since multiple threads could call this method concurrently, locking
// this method will avoid number of threads out of control.
@@ -2135,13 +2131,14 @@ public final class Utilities {
// Process the case when name node call is needed
final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
ArrayList<Future<?>> results = new ArrayList<Future<?>>();
- final ThreadPoolExecutor executor;
+ final ExecutorService executor;
int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
if (pathNeedProcess.size() > 1 && maxThreads > 1) {
int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
LOG.info("Using " + numExecutors + " threads for getContentSummary");
- executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>());
+ executor = Executors.newFixedThreadPool(numExecutors,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("Get-Input-Summary-%d").build());
} else {
executor = null;
}
@@ -2191,11 +2188,19 @@ public final class Utilities {
resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
return;
}
- HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
- SerDeUtils.createOverlayedProperties(
- partDesc.getTableDesc().getProperties(),
- partDesc.getProperties())
- .getProperty(hive_metastoreConstants.META_TABLE_STORAGE));
+
+ String metaTableStorage = null;
+ if (partDesc.getTableDesc() != null &&
+ partDesc.getTableDesc().getProperties() != null) {
+ metaTableStorage = partDesc.getTableDesc().getProperties()
+ .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null);
+ }
+ if (partDesc.getProperties() != null) {
+ metaTableStorage = partDesc.getProperties()
+ .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage);
+ }
+
+ HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage);
if (handler instanceof InputEstimator) {
long total = 0;
TableDesc tableDesc = partDesc.getTableDesc();
@@ -2207,14 +2212,15 @@ public final class Utilities {
Utilities.setColumnTypeList(jobConf, scanOp, true);
PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
- total += estimator.estimate(myJobConf, scanOp, -1).getTotalLength();
+ total += estimator.estimate(jobConf, scanOp, -1).getTotalLength();
}
resultMap.put(pathStr, new ContentSummary(total, -1, -1));
+ } else {
+ // todo: should nullify summary for non-native tables,
+ // not to be selected as a mapjoin target
+ FileSystem fs = p.getFileSystem(myConf);
+ resultMap.put(pathStr, fs.getContentSummary(p));
}
- // todo: should nullify summary for non-native tables,
- // not to be selected as a mapjoin target
- FileSystem fs = p.getFileSystem(myConf);
- resultMap.put(pathStr, fs.getContentSummary(p));
} catch (Exception e) {
// We safely ignore this exception for summary data.
// We don't update the cache to protect it from polluting other
http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
new file mode 100644
index 0000000..8c52979
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import java.util.Map;
+
+/**
+ * This is just a helper class to test the InputEstimator object used in some Utilities methods.
+ */
+public class InputEstimatorTestClass implements HiveStorageHandler, InputEstimator {
+ private static Estimation expectedEstimation = new Estimation(0, 0);
+
+ public InputEstimatorTestClass() {
+ }
+
+ public static void setEstimation(Estimation estimation) {
+ expectedEstimation = estimation;
+ }
+
+ @Override
+ public Class<? extends InputFormat> getInputFormatClass() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends OutputFormat> getOutputFormatClass() {
+ return null;
+ }
+
+ @Override
+ public Class<? extends AbstractSerDe> getSerDeClass() {
+ return null;
+ }
+
+ @Override
+ public HiveMetaHook getMetaHook() {
+ return null;
+ }
+
+ @Override
+ public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+ return null;
+ }
+
+ @Override
+ public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+ }
+
+ @Override
+ public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+ }
+
+ @Override
+ public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+ }
+
+ @Override
+ public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+
+ }
+
+ @Override
+ public Configuration getConf() {
+ return null;
+ }
+
+ @Override
+ public Estimation estimate(JobConf job, TableScanOperator ts, long remaining) throws HiveException {
+ return expectedEstimation;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index e444946..5a9d83c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -37,13 +37,16 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.*;
import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -57,7 +60,12 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@@ -353,4 +361,143 @@ public class TestUtilities {
}
}
}
+
+ @Test
+ public void testGetInputSummaryWithASingleThread() throws IOException {
+ final int NUM_PARTITIONS = 5;
+ final int BYTES_PER_FILE = 5;
+
+ JobConf jobConf = new JobConf();
+ Properties properties = new Properties();
+
+ jobConf.setInt("mapred.dfsclient.parallelism.max", 0);
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
+ assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+ assertEquals(NUM_PARTITIONS, summary.getFileCount());
+ assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+ }
+
+ @Test
+ public void testGetInputSummaryWithMultipleThreads() throws IOException {
+ final int NUM_PARTITIONS = 5;
+ final int BYTES_PER_FILE = 5;
+
+ JobConf jobConf = new JobConf();
+ Properties properties = new Properties();
+
+ jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
+ assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+ assertEquals(NUM_PARTITIONS, summary.getFileCount());
+ assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+ }
+
+ @Test
+ public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException {
+ final int NUM_PARTITIONS = 5;
+ final int BYTES_PER_FILE = 10;
+ final int NUM_OF_ROWS = 5;
+
+ JobConf jobConf = new JobConf();
+ Properties properties = new Properties();
+
+ jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+
+ properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
+ InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+ /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
+ assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+ assertEquals(NUM_PARTITIONS * -1, summary.getFileCount()); // Current getInputSummary() returns -1 for each file found
+ assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount()); // Current getInputSummary() returns -1 for each file found
+ }
+
+ static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat {
+ private static ContentSummary summary = new ContentSummary.Builder().build();
+
+ public static void setContentSummary(ContentSummary contentSummary) {
+ summary = contentSummary;
+ }
+
+ @Override
+ public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+ return null;
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path p, JobConf job) throws IOException {
+ return summary;
+ }
+ }
+
+ @Test
+ public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException {
+ final int NUM_PARTITIONS = 5;
+ final int BYTES_PER_FILE = 10;
+
+ JobConf jobConf = new JobConf();
+ Properties properties = new Properties();
+
+ jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+
+ ContentSummaryInputFormatTestClass.setContentSummary(
+ new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build());
+
+ /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */
+ ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class);
+ assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+ assertEquals(NUM_PARTITIONS * 2, summary.getFileCount());
+ assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+ }
+
+ private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class<? extends InputFormat> inputFormatClass) throws IOException {
+ // creates scratch directories needed by the Context object
+ SessionState.start(new HiveConf());
+
+ MapWork mapWork = new MapWork();
+ Context context = new Context(jobConf);
+ LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>();
+ LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>();
+ TableScanOperator scanOp = new TableScanOperator();
+
+ PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null);
+
+ String testTableName = "testTable";
+
+ Path testTablePath = new Path(testTableName);
+ Path[] testPartitionsPaths = new Path[numOfPartitions];
+ for (int i=0; i<numOfPartitions; i++) {
+ String testPartitionName = "p=" + 1;
+ testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
+
+ pathToPartitionInfo.put(testPartitionsPaths[i], partitionDesc);
+
+ pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
+
+ mapWork.getAliasToWork().put(testPartitionName, scanOp);
+ }
+
+ mapWork.setPathToAliases(pathToAliasTable);
+ mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+
+ FileSystem fs = FileSystem.getLocal(jobConf);
+ try {
+ fs.mkdirs(testTablePath);
+ byte[] data = new byte[bytesPerFile];
+
+ for (int i=0; i<numOfPartitions; i++) {
+ fs.mkdirs(testPartitionsPaths[i]);
+ FSDataOutputStream out = fs.create(new Path(testPartitionsPaths[i], "test1.txt"));
+ out.write(data);
+ out.close();
+ }
+
+ return Utilities.getInputSummary(context, mapWork, null);
+ } finally {
+ if (fs.exists(testTablePath)) {
+ fs.delete(testTablePath, true);
+ }
+ }
+ }
}