You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by pr...@apache.org on 2016/10/21 07:38:51 UTC

hive git commit: HIVE-14920: S3: Optimize SimpleFetchOptimizer::checkThreshold() (Rajesh Balamohan reviewed by Ashutosh Chauhan)

Repository: hive
Updated Branches:
  refs/heads/master 19999dad8 -> f2efa6a2b


HIVE-14920: S3: Optimize SimpleFetchOptimizer::checkThreshold() (Rajesh Balamohan reviewed by Ashutosh Chauhan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2efa6a2
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2efa6a2
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2efa6a2

Branch: refs/heads/master
Commit: f2efa6a2be52f09e700c931a293c816a446bf619
Parents: 19999da
Author: Prasanth Jayachandran <pr...@apache.org>
Authored: Fri Oct 21 00:38:36 2016 -0700
Committer: Prasanth Jayachandran <pr...@apache.org>
Committed: Fri Oct 21 00:38:36 2016 -0700

----------------------------------------------------------------------
 .../hive/ql/optimizer/SimpleFetchOptimizer.java | 115 +++++++++++++------
 1 file changed, 82 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f2efa6a2/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
index eb0ba7b..0481110 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/SimpleFetchOptimizer.java
@@ -26,7 +26,16 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
+
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.ContentSummary;
@@ -162,13 +171,7 @@ public class SimpleFetchOptimizer extends Transform {
         return true;
       }
     }
-    long remaining = threshold;
-    remaining -= data.getInputLength(pctx, remaining);
-    if (remaining < 0) {
-      LOG.info("Threshold " + remaining + " exceeded for pseudoMR mode");
-      return false;
-    }
-    return true;
+    return data.isDataLengthWitInThreshold(pctx, threshold);
   }
 
   // all we can handle is LimitOperator, FilterOperator SelectOperator and final FS
@@ -414,18 +417,16 @@ public class SimpleFetchOptimizer extends Transform {
       return replaceFSwithLS(fileSink, work.getSerializationNullFormat());
     }
 
-    private long getInputLength(ParseContext pctx, long remaining) throws Exception {
+    private boolean isDataLengthWitInThreshold(ParseContext pctx, final long threshold)
+        throws Exception {
       if (splitSample != null && splitSample.getTotalLength() != null) {
-        return splitSample.getTotalLength();
-      }
-      if (splitSample != null) {
-        return splitSample.getTargetSize(calculateLength(pctx, splitSample.estimateSourceSize(remaining)));
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Threshold " + splitSample.getTotalLength() + " exceeded for pseudoMR mode");
+        }
+        return (threshold - splitSample.getTotalLength()) > 0;
       }
-      return calculateLength(pctx, remaining);
-    }
 
-    private long calculateLength(ParseContext pctx, long remaining) throws Exception {
-      JobConf jobConf = new JobConf(pctx.getConf());
+      final JobConf jobConf = new JobConf(pctx.getConf());
       Utilities.setColumnNameList(jobConf, scanOp, true);
       Utilities.setColumnTypeList(jobConf, scanOp, true);
       HiveStorageHandler handler = table.getStorageHandler();
@@ -434,41 +435,89 @@ public class SimpleFetchOptimizer extends Transform {
         TableDesc tableDesc = Utilities.getTableDesc(table);
         PlanUtils.configureInputJobPropertiesForStorageHandler(tableDesc);
         Utilities.copyTableJobPropertiesToConf(tableDesc, jobConf);
-        return estimator.estimate(jobConf, scanOp, remaining).getTotalLength();
+        long len = estimator.estimate(jobConf, scanOp, threshold).getTotalLength();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+        }
+        return (threshold - len) > 0;
       }
       if (table.isNonNative()) {
-        return 0; // nothing can be done
+        return true; // nothing can be done
       }
       if (!table.isPartitioned()) {
-        return getFileLength(jobConf, table.getPath(), table.getInputFormatClass());
+        long len = getPathLength(jobConf, table.getPath(), table.getInputFormatClass(), threshold);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Threshold " + len + " exceeded for pseudoMR mode");
+        }
+        return (threshold - len) > 0;
       }
-      long total = 0;
-      for (Partition partition : partsList.getNotDeniedPartns()) {
-        Path path = partition.getDataLocation();
-        total += getFileLength(jobConf, path, partition.getInputFormatClass());
-        if (total > remaining) {
-          break;
+      final AtomicLong total = new AtomicLong(0);
+      //TODO: use common thread pool later?
+      int threadCount = HiveConf.getIntVar(pctx.getConf(),
+          HiveConf.ConfVars.HIVE_STATS_GATHER_NUM_THREADS);
+      final ExecutorService pool = (threadCount > 0) ?
+          Executors.newFixedThreadPool(threadCount,
+              new ThreadFactoryBuilder()
+                  .setDaemon(true)
+                  .setNameFormat("SimpleFetchOptimizer-FileLength-%d").build()) : null;
+      try {
+        List<Future> futures = Lists.newLinkedList();
+        for (final Partition partition : partsList.getNotDeniedPartns()) {
+          final Path path = partition.getDataLocation();
+          if (pool != null) {
+            futures.add(pool.submit(new Callable<Long>() {
+              @Override
+              public Long call() throws Exception {
+                long len = getPathLength(jobConf, path, partition.getInputFormatClass(), threshold);
+                LOG.trace(path  + ", length=" + len);
+                return total.addAndGet(len);
+              }
+            }));
+          } else {
+            total.addAndGet(getPathLength(jobConf, path, partition.getInputFormatClass(), threshold));
+          }
+        }
+        if (pool != null) {
+          pool.shutdown();
+          for (Future<Long> future : futures) {
+            long totalLen = future.get();
+            if ((threshold - totalLen) <= 0) {
+              // early exit, as getting file lengths can be expensive in object stores.
+              return false;
+            }
+          }
+        }
+        return (threshold - total.get()) >= 0;
+      } finally {
+        LOG.info("Data set size=" + total.get() + ", threshold=" + threshold);
+        if (pool != null) {
+          pool.shutdownNow();
         }
       }
-      return total;
     }
 
-    // from Utilities.getInputSummary()
-    private long getFileLength(JobConf conf, Path path, Class<? extends InputFormat> clazz)
+    private long getPathLength(JobConf conf, Path path,
+        Class<? extends InputFormat> clazz, long threshold)
         throws IOException {
-      ContentSummary summary;
       if (ContentSummaryInputFormat.class.isAssignableFrom(clazz)) {
         InputFormat input = HiveInputFormat.getInputFormatFromCache(clazz, conf);
-        summary = ((ContentSummaryInputFormat)input).getContentSummary(path, conf);
+        return ((ContentSummaryInputFormat)input).getContentSummary(path, conf).getLength();
       } else {
         FileSystem fs = path.getFileSystem(conf);
         try {
-          summary = fs.getContentSummary(path);
+          long length = 0;
+          RemoteIterator<LocatedFileStatus> results = fs.listFiles(path, true);
+          // No need to iterate more, when threshold is reached
+          // (beneficial especially for object stores)
+          while (length <= threshold && results.hasNext()) {
+            length += results.next().getLen();
+          }
+          LOG.trace("length=" + length + ", threshold=" + threshold);
+          return length;
         } catch (FileNotFoundException e) {
           return 0;
         }
       }
-      return summary.getLength();
     }
   }