You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2013/10/18 04:13:11 UTC

svn commit: r1533311 - /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java

Author: gunther
Date: Fri Oct 18 02:13:11 2013
New Revision: 1533311

URL: http://svn.apache.org/r1533311
Log:
HIVE-5586: Call createSplits with multiple paths if partition specs match on Tez (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java?rev=1533311&r1=1533310&r2=1533311&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java Fri Oct 18 02:13:11 2013
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.ql.plan.Ex
 import org.apache.hadoop.hive.ql.plan.MapWork;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -252,6 +254,22 @@ public class HiveInputFormat<K extends W
     pathToPartitionInfo = mrwork.getPathToPartitionInfo();
   }
 
+  private void addSplitsForGroup(List<Path> dirs, TableScanOperator tableScan, JobConf conf,
+      InputFormat inputFormat, Class<? extends InputFormat> inputFormatClass, int splits,
+      TableDesc table, List<InputSplit> result) throws IOException {
+
+    Utilities.copyTableJobPropertiesToConf(table, conf);
+
+    pushFilters(conf, tableScan);
+
+    FileInputFormat.setInputPaths(conf, dirs.toArray(new Path[dirs.size()]));
+    conf.setInputFormat(inputFormat.getClass());
+    InputSplit[] iss = inputFormat.getSplits(conf, splits);
+    for (InputSplit is : iss) {
+      result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+    }
+  }
+
   public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     PerfLogger perfLogger = PerfLogger.getPerfLogger();
     perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.GET_SPLITS);
@@ -262,36 +280,60 @@ public class HiveInputFormat<K extends W
       throw new IOException("No input paths specified in job");
     }
     JobConf newjob = new JobConf(job);
-    ArrayList<InputSplit> result = new ArrayList<InputSplit>();
+    List<InputSplit> result = new ArrayList<InputSplit>();
+
+    List<Path> currentDirs = new ArrayList<Path>();
+    Class<? extends InputFormat> currentInputFormatClass = null;
+    TableDesc currentTable = null;
+    TableScanOperator currentTableScan = null;
 
     // for each dir, get the InputFormat, and do getSplits.
     for (Path dir : dirs) {
       PartitionDesc part = getPartitionDescFromPath(pathToPartitionInfo, dir);
-      // create a new InputFormat instance if this is the first time to see this
-      // class
-      Class inputFormatClass = part.getInputFileFormatClass();
-      InputFormat inputFormat = getInputFormatFromCache(inputFormatClass, job);
-      Utilities.copyTableJobPropertiesToConf(part.getTableDesc(), newjob);
+      Class<? extends InputFormat> inputFormatClass = part.getInputFileFormatClass();
+      TableDesc table = part.getTableDesc();
+      TableScanOperator tableScan = null;
+
+      List<String> aliases =
+          mrwork.getPathToAliases().get(dir.toUri().toString());
 
       // Make filter pushdown information available to getSplits.
-      ArrayList<String> aliases =
-        mrwork.getPathToAliases().get(dir.toUri().toString());
       if ((aliases != null) && (aliases.size() == 1)) {
         Operator op = mrwork.getAliasToWork().get(aliases.get(0));
         if ((op != null) && (op instanceof TableScanOperator)) {
-          TableScanOperator tableScan = (TableScanOperator) op;
-          pushFilters(newjob, tableScan);
+          tableScan = (TableScanOperator) op;
         }
       }
 
-      FileInputFormat.setInputPaths(newjob, dir);
-      newjob.setInputFormat(inputFormat.getClass());
-      InputSplit[] iss = inputFormat.getSplits(newjob, numSplits / dirs.length);
-      for (InputSplit is : iss) {
-        result.add(new HiveInputSplit(is, inputFormatClass.getName()));
+      if (!currentDirs.isEmpty() &&
+          inputFormatClass.equals(currentInputFormatClass) &&
+          table.equals(currentTable) &&
+          tableScan == currentTableScan) {
+        currentDirs.add(dir);
+        continue;
+      }
+
+      if (!currentDirs.isEmpty()) {
+        LOG.info("Generating splits");
+        addSplitsForGroup(currentDirs, currentTableScan, newjob,
+            getInputFormatFromCache(currentInputFormatClass, job),
+            currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
+            currentTable, result);
       }
+
+      currentDirs.clear();
+      currentDirs.add(dir);
+      currentTableScan = tableScan;
+      currentTable = table;
+      currentInputFormatClass = inputFormatClass;
     }
 
+    LOG.info("Generating splits");
+    addSplitsForGroup(currentDirs, currentTableScan, newjob,
+        getInputFormatFromCache(currentInputFormatClass, job),
+        currentInputFormatClass, currentDirs.size()*(numSplits / dirs.length),
+        currentTable, result);
+
     LOG.info("number of splits " + result.size());
     perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.GET_SPLITS);
     return result.toArray(new HiveInputSplit[result.size()]);