You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by js...@apache.org on 2010/08/05 09:36:40 UTC

svn commit: r982490 - in /hadoop/hive/trunk: ./ common/src/java/org/apache/hadoop/hive/conf/ conf/ ql/src/java/org/apache/hadoop/hive/ql/exec/ ql/src/test/queries/clientnegative/ ql/src/test/results/clientnegative/

Author: jssarma
Date: Thu Aug  5 07:36:39 2010
New Revision: 982490

URL: http://svn.apache.org/viewvc?rev=982490&view=rev
Log:
HIVE-1509. Monitor the working set of the number of files
(Ning Zhang via jssarma)

Added:
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hadoop/hive/trunk/conf/hive-default.xml
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Aug  5 07:36:39 2010
@@ -65,6 +65,9 @@ Trunk -  Unreleased
 
   BUG FIXES
 
+    HIVE-1509. Hive should kill query if too many files are created by it
+    (Ning Zhang via jssarma)
+
     HIVE-1471. CTAS should unescape column names in select-clause
     (Ning Zhang via jssarma)
 

Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Aug  5 07:36:39 2010
@@ -91,6 +91,7 @@ public class HiveConf extends Configurat
     DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"),
     DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000),
     DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
+    MAXCREATEDFILES("hive.exec.max.created.files", 100000L),
     DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
 
 

Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Aug  5 07:36:39 2010
@@ -588,6 +588,13 @@
 </property>
 
 <property>
+  <name>hive.exec.max.created.files</name>
+  <value>100000</value>
+  <description>Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.</description>
+</property>
+
+
+<property>
   <name>hive.default.partition.name</name>
   <value>__HIVE_DEFAULT_PARTITION__</value>
   <description>The default partition name in case the dynamic partition column value is null/empty string or anyother values that cannot be escaped. This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the dynamic partition value should not contain this value to avoid confusions.</description>

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Aug  5 07:36:39 2010
@@ -19,14 +19,12 @@
 package org.apache.hadoop.hive.ql.exec;
 
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.Serializable;
 import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
 import java.net.URL;
+import java.net.URLDecoder;
 import java.net.URLEncoder;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -38,22 +36,21 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Random;
 import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
 import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
 import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
 import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
@@ -61,9 +58,9 @@ import org.apache.hadoop.hive.ql.io.Hive
 import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
 import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
 import org.apache.hadoop.hive.ql.plan.PartitionDesc;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -77,14 +74,13 @@ import org.apache.hadoop.mapred.FileInpu
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskCompletionEvent;
 import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.varia.NullAppender;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.varia.NullAppender;
 
 /**
  * ExecDriver.
@@ -293,6 +289,13 @@ public class ExecDriver extends Task<Map
         // we may still be able to retrieve the job status - so ignore
         return false;
       }
+      // check for number of created files
+      long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+      long upperLimit =  HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+      if (numFiles > upperLimit) {
+        errMsg.append("total number of created files exceeds ").append(upperLimit);
+        return true;
+      }
 
       for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
         if (op.checkFatalErrors(ctrs, errMsg)) {
@@ -640,8 +643,9 @@ public class ExecDriver extends Task<Map
     } finally {
       Utilities.clearMapRedWork(job);
       try {
-        if(ctxCreated)
+        if(ctxCreated) {
           ctx.clear();
+        }
 
         if (rj != null) {
           if (returnVal != 0) {
@@ -895,9 +899,10 @@ public class ExecDriver extends Task<Map
   private static void setupChildLog4j(Configuration conf) {
     URL hive_l4j = ExecDriver.class.getClassLoader().getResource
       (SessionState.HIVE_EXEC_L4J);
-    if(hive_l4j == null)
+    if(hive_l4j == null) {
       hive_l4j = ExecDriver.class.getClassLoader().getResource
       (SessionState.HIVE_L4J);
+    }
 
     if (hive_l4j != null) {
         // setting queryid so that log4j configuration can use it to generate
@@ -1072,13 +1077,13 @@ public class ExecDriver extends Task<Map
         sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
             + Utilities.randGen.nextInt(), "UTF-8"));
       }
-      
+
       return sb.toString();
     } catch (UnsupportedEncodingException e) {
       throw new RuntimeException(e);
     }
   }
-  
+
   @Override
   public boolean isMapRedTask() {
     return true;
@@ -1244,8 +1249,9 @@ public class ExecDriver extends Task<Map
       if (m != null) {
         for (FetchWork fw: m.values()) {
           String s = fw.getTblDir();
-          if ((s != null) && ctx.isMRTmpFileURI(s))
+          if ((s != null) && ctx.isMRTmpFileURI(s)) {
             fw.setTblDir(ctx.localizeMRTmpFileURI(s));
+          }
         }
       }
     }
@@ -1253,27 +1259,30 @@ public class ExecDriver extends Task<Map
     // fix up outputs
     Map<String, ArrayList<String>> pa = work.getPathToAliases();
     if (pa != null) {
-      for (List<String> ls: pa.values())
+      for (List<String> ls: pa.values()) {
         for (String a: ls) {
           ArrayList<Operator<? extends Serializable>> opList = new
             ArrayList<Operator<? extends Serializable>> ();
           opList.add(work.getAliasToWork().get(a));
-          
+
           while (!opList.isEmpty()) {
             Operator<? extends Serializable> op = opList.remove(0);
 
             if (op instanceof FileSinkOperator) {
               FileSinkDesc fdesc = ((FileSinkOperator)op).getConf();
               String s = fdesc.getDirName();
-              if ((s != null) && ctx.isMRTmpFileURI(s))
+              if ((s != null) && ctx.isMRTmpFileURI(s)) {
                 fdesc.setDirName(ctx.localizeMRTmpFileURI(s));
+              }
               ((FileSinkOperator)op).setConf(fdesc);
             }
 
-            if (op.getChildOperators() != null)
+            if (op.getChildOperators() != null) {
               opList.addAll(op.getChildOperators());
+            }
           }
         }
+      }
     }
   }
 }

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Aug  5 07:36:39 2010
@@ -49,10 +49,10 @@ import org.apache.hadoop.hive.serde2.Ser
 import org.apache.hadoop.hive.serde2.Serializer;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
@@ -463,6 +463,10 @@ public class FileSinkOperator extends Te
         // buckets of dynamic partitions will be created for each newly created partition
         fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
               jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
+        // increment the CREATED_FILES counter
+        if (reporter != null) {
+          reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
+        }
         filesIdx++;
       }
       assert filesIdx == numFiles;
@@ -517,9 +521,7 @@ public class FileSinkOperator extends Te
     }
 
     try {
-      if (reporter != null) {
-        reporter.progress();
-      }
+      updateProgress();
 
       // if DP is enabled, get the final output writers and prepare the real output row
       assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT:

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Aug  5 07:36:39 2010
@@ -25,8 +25,8 @@ import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Properties;
 import java.util.Map.Entry;
+import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -185,7 +185,7 @@ public class MapOperator extends Operato
    * Initializes this map op as the root of the tree. It sets JobConf &
    * MapRedWork and starts initialization of the operator tree rooted at this
    * op.
-   * 
+   *
    * @param hconf
    * @param mrwork
    * @throws HiveException
@@ -292,7 +292,7 @@ public class MapOperator extends Operato
           Operator<? extends Serializable> op = conf.getAliasToWork().get(
               onealias);
           LOG.info("Adding alias " + onealias + " to work list for file "
-              + fpath.toUri().getPath());
+              + onefile);
           MapInputPath inp = new MapInputPath(onefile, onealias, op);
           opCtxMap.put(inp, opCtx);
           if (operatorToPaths.get(op) == null) {
@@ -449,15 +449,15 @@ public class MapOperator extends Operato
       try {
         rawRowString = value.toString();
       } catch (Exception e2) {
-        rawRowString = "[Error getting row data with exception " + 
+        rawRowString = "[Error getting row data with exception " +
             StringUtils.stringifyException(e2) + " ]";
       }
-      
+
       // TODO: policy on deserialization errors
       deserialize_error_count.set(deserialize_error_count.get() + 1);
       throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
     }
-    
+
     try {
       if (this.hasVC) {
         forward(this.rowWithPartAndVC, this.rowObjectInspector);
@@ -478,7 +478,7 @@ public class MapOperator extends Operato
           rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector);
         }
       } catch (Exception e2) {
-        rowString = "[Error getting row data with exception " + 
+        rowString = "[Error getting row data with exception " +
             StringUtils.stringifyException(e2) + " ]";
       }
       throw new HiveException("Hive Runtime Error while processing row " + rowString, e);

Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Aug  5 07:36:39 2010
@@ -29,7 +29,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.ExecMapperContext;
 import org.apache.hadoop.hive.ql.lib.Node;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.plan.Explain;
@@ -845,6 +844,7 @@ public abstract class Operator<T extends
    * TODO This is a hack for hadoop 0.17 which only supports enum counters.
    */
   public static enum ProgressCounter {
+    CREATED_FILES,
     C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
     C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
     C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,

Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q?rev=982490&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q Thu Aug  5 07:36:39 2010
@@ -0,0 +1,9 @@
+set hive.exec.max.dynamic.partitions=600;
+set hive.exec.max.dynamic.partitions.pernode=600;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+set hive.exec.max.created.files=100;
+
+create table nzhang_part( key string) partitioned by (value string);
+
+insert overwrite table nzhang_part partition(value) select key, value from src;

Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out?rev=982490&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out Thu Aug  5 07:36:39 2010
@@ -0,0 +1,9 @@
+PREHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@nzhang_part
+PREHOOK: query: insert overwrite table nzhang_part partition(value) select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask