You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2017/02/02 02:03:30 UTC

[31/50] [abbrv] hive git commit: HIVE-15736: Add unit tests to Utilities.getInputSummary() method for multi-threading cases (Sergio Pena, reviewed by Chaoyu Tang)

HIVE-15736: Add unit tests to Utilities.getInputSummary() method for multi-threading cases (Sergio Pena, reviewed by Chaoyu Tang)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0c3d5da7
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0c3d5da7
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0c3d5da7

Branch: refs/heads/hive-14535
Commit: 0c3d5da71d221a0a9792118bd780f38149822a89
Parents: 4becd68
Author: Sergio Pena <se...@cloudera.com>
Authored: Wed Feb 1 09:53:14 2017 -0600
Committer: Sergio Pena <se...@cloudera.com>
Committed: Wed Feb 1 09:53:14 2017 -0600

----------------------------------------------------------------------
 .../apache/hadoop/hive/ql/exec/Utilities.java   |  42 +++---
 .../hive/ql/exec/InputEstimatorTestClass.java   | 106 +++++++++++++
 .../hadoop/hive/ql/exec/TestUtilities.java      | 151 ++++++++++++++++++-
 3 files changed, 279 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
index 68dd5e7..12a03d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
@@ -176,7 +176,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Calendar;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -195,9 +194,6 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.zip.Deflater;
@@ -2106,7 +2102,7 @@ public final class Utilities {
 
     long[] summary = {0, 0, 0};
 
-    final List<Path> pathNeedProcess = new ArrayList<>();
+    final Set<Path> pathNeedProcess = new HashSet<>();
 
     // Since multiple threads could call this method concurrently, locking
     // this method will avoid number of threads out of control.
@@ -2135,13 +2131,14 @@ public final class Utilities {
       // Process the case when name node call is needed
       final Map<String, ContentSummary> resultMap = new ConcurrentHashMap<String, ContentSummary>();
       ArrayList<Future<?>> results = new ArrayList<Future<?>>();
-      final ThreadPoolExecutor executor;
+      final ExecutorService executor;
       int maxThreads = ctx.getConf().getInt("mapred.dfsclient.parallelism.max", 0);
       if (pathNeedProcess.size() > 1 && maxThreads > 1) {
         int numExecutors = Math.min(pathNeedProcess.size(), maxThreads);
         LOG.info("Using " + numExecutors + " threads for getContentSummary");
-        executor = new ThreadPoolExecutor(numExecutors, numExecutors, 60, TimeUnit.SECONDS,
-            new LinkedBlockingQueue<Runnable>());
+        executor = Executors.newFixedThreadPool(numExecutors,
+            new ThreadFactoryBuilder().setDaemon(true)
+                .setNameFormat("Get-Input-Summary-%d").build());
       } else {
         executor = null;
       }
@@ -2191,11 +2188,19 @@ public final class Utilities {
                   resultMap.put(pathStr, cs.getContentSummary(p, myJobConf));
                   return;
                 }
-                HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf,
-                    SerDeUtils.createOverlayedProperties(
-                        partDesc.getTableDesc().getProperties(),
-                        partDesc.getProperties())
-                        .getProperty(hive_metastoreConstants.META_TABLE_STORAGE));
+
+                String metaTableStorage = null;
+                if (partDesc.getTableDesc() != null &&
+                    partDesc.getTableDesc().getProperties() != null) {
+                  metaTableStorage = partDesc.getTableDesc().getProperties()
+                      .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, null);
+                }
+                if (partDesc.getProperties() != null) {
+                  metaTableStorage = partDesc.getProperties()
+                      .getProperty(hive_metastoreConstants.META_TABLE_STORAGE, metaTableStorage);
+                }
+
+                HiveStorageHandler handler = HiveUtils.getStorageHandler(myConf, metaTableStorage);
                 if (handler instanceof InputEstimator) {
                   long total = 0;
                   TableDesc tableDesc = partDesc.getTableDesc();
@@ -2207,14 +2212,15 @@ public final class Utilities {
                     Utilities.setColumnTypeList(jobConf, scanOp, true);
                     PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
                     Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
-                    total += estimator.estimate(myJobConf, scanOp, -1).getTotalLength();
+                    total += estimator.estimate(jobConf, scanOp, -1).getTotalLength();
                   }
                   resultMap.put(pathStr, new ContentSummary(total, -1, -1));
+                } else {
+                  // todo: should nullify summary for non-native tables,
+                  // not to be selected as a mapjoin target
+                  FileSystem fs = p.getFileSystem(myConf);
+                  resultMap.put(pathStr, fs.getContentSummary(p));
                 }
-                // todo: should nullify summary for non-native tables,
-                // not to be selected as a mapjoin target
-                FileSystem fs = p.getFileSystem(myConf);
-                resultMap.put(pathStr, fs.getContentSummary(p));
               } catch (Exception e) {
                 // We safely ignore this exception for summary data.
                 // We don't update the cache to protect it from polluting other

http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
new file mode 100644
index 0000000..8c52979
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/InputEstimatorTestClass.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.exec;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.ql.security.authorization.HiveAuthorizationProvider;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputFormat;
+
+import java.util.Map;
+
+/**
+ * This is just a helper class to test the InputEstimator object used in some Utilities methods.
+ */
+public class InputEstimatorTestClass implements HiveStorageHandler, InputEstimator {
+  private static Estimation expectedEstimation = new Estimation(0, 0);
+
+  public InputEstimatorTestClass() {
+  }
+
+  public static void setEstimation(Estimation estimation) {
+    expectedEstimation = estimation;
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return null;
+  }
+
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return null;
+  }
+
+  @Override
+  public Class<? extends AbstractSerDe> getSerDeClass() {
+    return null;
+  }
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return null;
+  }
+
+  @Override
+  public HiveAuthorizationProvider getAuthorizationProvider() throws HiveException {
+    return null;
+  }
+
+  @Override
+  public void configureInputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+  }
+
+  @Override
+  public void configureOutputJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+  }
+
+  @Override
+  public void configureTableJobProperties(TableDesc tableDesc, Map<String, String> jobProperties) {
+
+  }
+
+  @Override
+  public void configureJobConf(TableDesc tableDesc, JobConf jobConf) {
+
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+
+  }
+
+  @Override
+  public Configuration getConf() {
+    return null;
+  }
+
+  @Override
+  public Estimation estimate(JobConf job, TableScanOperator ts, long remaining) throws HiveException {
+    return expectedEstimation;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0c3d5da7/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
index e444946..5a9d83c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestUtilities.java
@@ -37,13 +37,16 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.Context;
-import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat;
-import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat;
+import org.apache.hadoop.hive.ql.io.*;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.InputEstimator;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.plan.DynamicPartitionCtx;
 import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
@@ -57,7 +60,12 @@ import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.udf.generic.GenericUDFFromUtcTimestamp;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.RecordReader;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
@@ -353,4 +361,143 @@ public class TestUtilities {
       }
     }
   }
+
+  @Test
+  public void testGetInputSummaryWithASingleThread() throws IOException {
+    final int NUM_PARTITIONS = 5;
+    final int BYTES_PER_FILE = 5;
+
+    JobConf jobConf = new JobConf();
+    Properties properties = new Properties();
+
+    jobConf.setInt("mapred.dfsclient.parallelism.max", 0);
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
+    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+    assertEquals(NUM_PARTITIONS, summary.getFileCount());
+    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+  }
+
+  @Test
+  public void testGetInputSummaryWithMultipleThreads() throws IOException {
+    final int NUM_PARTITIONS = 5;
+    final int BYTES_PER_FILE = 5;
+
+    JobConf jobConf = new JobConf();
+    Properties properties = new Properties();
+
+    jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE, HiveInputFormat.class);
+    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+    assertEquals(NUM_PARTITIONS, summary.getFileCount());
+    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+  }
+
+  @Test
+  public void testGetInputSummaryWithInputEstimator() throws IOException, HiveException {
+    final int NUM_PARTITIONS = 5;
+    final int BYTES_PER_FILE = 10;
+    final int NUM_OF_ROWS = 5;
+
+    JobConf jobConf = new JobConf();
+    Properties properties = new Properties();
+
+    jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+
+    properties.setProperty(hive_metastoreConstants.META_TABLE_STORAGE, InputEstimatorTestClass.class.getName());
+    InputEstimatorTestClass.setEstimation(new InputEstimator.Estimation(NUM_OF_ROWS, BYTES_PER_FILE));
+
+    /* Let's write more bytes to the files to test that Estimator is actually working returning the file size not from the filesystem */
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, HiveInputFormat.class);
+    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+    assertEquals(NUM_PARTITIONS * -1, summary.getFileCount());        // Current getInputSummary() returns -1 for each file found
+    assertEquals(NUM_PARTITIONS * -1, summary.getDirectoryCount());   // Current getInputSummary() returns -1 for each file found
+  }
+
+  static class ContentSummaryInputFormatTestClass extends FileInputFormat implements ContentSummaryInputFormat {
+    private static ContentSummary summary = new ContentSummary.Builder().build();
+
+    public static void setContentSummary(ContentSummary contentSummary) {
+      summary = contentSummary;
+    }
+
+    @Override
+    public RecordReader getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
+      return null;
+    }
+
+    @Override
+    public ContentSummary getContentSummary(Path p, JobConf job) throws IOException {
+      return summary;
+    }
+  }
+
+  @Test
+  public void testGetInputSummaryWithContentSummaryInputFormat() throws IOException {
+    final int NUM_PARTITIONS = 5;
+    final int BYTES_PER_FILE = 10;
+
+    JobConf jobConf = new JobConf();
+    Properties properties = new Properties();
+
+    jobConf.setInt("mapred.dfsclient.parallelism.max", 2);
+
+    ContentSummaryInputFormatTestClass.setContentSummary(
+        new ContentSummary.Builder().length(BYTES_PER_FILE).fileCount(2).directoryCount(1).build());
+
+    /* Let's write more bytes to the files to test that ContentSummaryInputFormat is actually working returning the file size not from the filesystem */
+    ContentSummary summary = runTestGetInputSummary(jobConf, properties, NUM_PARTITIONS, BYTES_PER_FILE * 2, ContentSummaryInputFormatTestClass.class);
+    assertEquals(NUM_PARTITIONS * BYTES_PER_FILE, summary.getLength());
+    assertEquals(NUM_PARTITIONS * 2, summary.getFileCount());
+    assertEquals(NUM_PARTITIONS, summary.getDirectoryCount());
+  }
+
+  private ContentSummary runTestGetInputSummary(JobConf jobConf, Properties properties, int numOfPartitions, int bytesPerFile, Class<? extends InputFormat> inputFormatClass) throws IOException {
+    // creates scratch directories needed by the Context object
+    SessionState.start(new HiveConf());
+
+    MapWork mapWork = new MapWork();
+    Context context = new Context(jobConf);
+    LinkedHashMap<Path, PartitionDesc> pathToPartitionInfo = new LinkedHashMap<>();
+    LinkedHashMap<Path, ArrayList<String>> pathToAliasTable = new LinkedHashMap<>();
+    TableScanOperator scanOp = new TableScanOperator();
+
+    PartitionDesc partitionDesc = new PartitionDesc(new TableDesc(inputFormatClass, null, properties), null);
+
+    String testTableName = "testTable";
+
+    Path testTablePath = new Path(testTableName);
+    Path[] testPartitionsPaths = new Path[numOfPartitions];
+    for (int i=0; i<numOfPartitions; i++) {
+      String testPartitionName = "p=" + 1;
+      testPartitionsPaths[i] = new Path(testTablePath, "p=" + i);
+
+      pathToPartitionInfo.put(testPartitionsPaths[i], partitionDesc);
+
+      pathToAliasTable.put(testPartitionsPaths[i], Lists.newArrayList(testPartitionName));
+
+      mapWork.getAliasToWork().put(testPartitionName, scanOp);
+    }
+
+    mapWork.setPathToAliases(pathToAliasTable);
+    mapWork.setPathToPartitionInfo(pathToPartitionInfo);
+
+    FileSystem fs = FileSystem.getLocal(jobConf);
+    try {
+      fs.mkdirs(testTablePath);
+      byte[] data = new byte[bytesPerFile];
+
+      for (int i=0; i<numOfPartitions; i++) {
+        fs.mkdirs(testPartitionsPaths[i]);
+        FSDataOutputStream out = fs.create(new Path(testPartitionsPaths[i], "test1.txt"));
+        out.write(data);
+        out.close();
+      }
+
+      return Utilities.getInputSummary(context, mapWork, null);
+    } finally {
+      if (fs.exists(testTablePath)) {
+        fs.delete(testTablePath, true);
+      }
+    }
+  }
 }