You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by js...@apache.org on 2010/08/05 09:36:40 UTC
svn commit: r982490 - in /hadoop/hive/trunk: ./
common/src/java/org/apache/hadoop/hive/conf/ conf/
ql/src/java/org/apache/hadoop/hive/ql/exec/
ql/src/test/queries/clientnegative/ ql/src/test/results/clientnegative/
Author: jssarma
Date: Thu Aug 5 07:36:39 2010
New Revision: 982490
URL: http://svn.apache.org/viewvc?rev=982490&view=rev
Log:
HIVE-1509. Monitor the working set of the number of files
(Ning Zhang via jssarma)
Added:
hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q
hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out
Modified:
hadoop/hive/trunk/CHANGES.txt
hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hadoop/hive/trunk/conf/hive-default.xml
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Thu Aug 5 07:36:39 2010
@@ -65,6 +65,9 @@ Trunk - Unreleased
BUG FIXES
+ HIVE-1509. Hive should kill query if too many files are created by it
+ (Ning Zhang via jssarma)
+
HIVE-1471. CTAS should unescape column names in select-clause
(Ning Zhang via jssarma)
Modified: hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hadoop/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Thu Aug 5 07:36:39 2010
@@ -91,6 +91,7 @@ public class HiveConf extends Configurat
DYNAMICPARTITIONINGMODE("hive.exec.dynamic.partition.mode", "strict"),
DYNAMICPARTITIONMAXPARTS("hive.exec.max.dynamic.partitions", 1000),
DYNAMICPARTITIONMAXPARTSPERNODE("hive.exec.max.dynamic.partitions.pernode", 100),
+ MAXCREATEDFILES("hive.exec.max.created.files", 100000L),
DEFAULTPARTITIONNAME("hive.exec.default.partition.name", "__HIVE_DEFAULT_PARTITION__"),
Modified: hadoop/hive/trunk/conf/hive-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/conf/hive-default.xml?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/conf/hive-default.xml (original)
+++ hadoop/hive/trunk/conf/hive-default.xml Thu Aug 5 07:36:39 2010
@@ -588,6 +588,13 @@
</property>
<property>
+ <name>hive.exec.max.created.files</name>
+ <value>100000</value>
+ <description>Maximum number of HDFS files created by all mappers/reducers in a MapReduce job.</description>
+</property>
+
+
+<property>
<name>hive.default.partition.name</name>
<value>__HIVE_DEFAULT_PARTITION__</value>
<description>The default partition name in case the dynamic partition column value is null/empty string or anyother values that cannot be escaped. This value must not contain any special character used in HDFS URI (e.g., ':', '%', '/' etc). The user has to be aware that the dynamic partition value should not contain this value to avoid confusions.</description>
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java Thu Aug 5 07:36:39 2010
@@ -19,14 +19,12 @@
package org.apache.hadoop.hive.ql.exec;
import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.UnsupportedEncodingException;
-import java.net.URI;
-import java.net.URLDecoder;
import java.net.URL;
+import java.net.URLDecoder;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
@@ -38,22 +36,21 @@ import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
-import java.util.Random;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.Context;
+import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.exec.Operator.ProgressCounter;
import org.apache.hadoop.hive.ql.exec.errors.ErrorAndSolution;
import org.apache.hadoop.hive.ql.exec.errors.TaskLogProcessor;
import org.apache.hadoop.hive.ql.history.HiveHistory.Keys;
@@ -61,9 +58,9 @@ import org.apache.hadoop.hive.ql.io.Hive
import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.FetchWork;
-import org.apache.hadoop.hive.ql.plan.MapredWork;
-import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
import org.apache.hadoop.hive.ql.plan.FileSinkDesc;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
import org.apache.hadoop.hive.ql.plan.PartitionDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.api.StageType;
@@ -77,14 +74,13 @@ import org.apache.hadoop.mapred.FileInpu
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.log4j.BasicConfigurator;
-import org.apache.log4j.varia.NullAppender;
import org.apache.log4j.LogManager;
import org.apache.log4j.PropertyConfigurator;
+import org.apache.log4j.varia.NullAppender;
/**
* ExecDriver.
@@ -293,6 +289,13 @@ public class ExecDriver extends Task<Map
// we may still be able to retrieve the job status - so ignore
return false;
}
+ // check for number of created files
+ long numFiles = ctrs.getCounter(ProgressCounter.CREATED_FILES);
+ long upperLimit = HiveConf.getLongVar(job, HiveConf.ConfVars.MAXCREATEDFILES);
+ if (numFiles > upperLimit) {
+ errMsg.append("total number of created files exceeds ").append(upperLimit);
+ return true;
+ }
for (Operator<? extends Serializable> op : work.getAliasToWork().values()) {
if (op.checkFatalErrors(ctrs, errMsg)) {
@@ -640,8 +643,9 @@ public class ExecDriver extends Task<Map
} finally {
Utilities.clearMapRedWork(job);
try {
- if(ctxCreated)
+ if(ctxCreated) {
ctx.clear();
+ }
if (rj != null) {
if (returnVal != 0) {
@@ -895,9 +899,10 @@ public class ExecDriver extends Task<Map
private static void setupChildLog4j(Configuration conf) {
URL hive_l4j = ExecDriver.class.getClassLoader().getResource
(SessionState.HIVE_EXEC_L4J);
- if(hive_l4j == null)
+ if(hive_l4j == null) {
hive_l4j = ExecDriver.class.getClassLoader().getResource
(SessionState.HIVE_L4J);
+ }
if (hive_l4j != null) {
// setting queryid so that log4j configuration can use it to generate
@@ -1072,13 +1077,13 @@ public class ExecDriver extends Task<Map
sb.append(URLEncoder.encode(hconf.get(hadoopWorkDir) + "/"
+ Utilities.randGen.nextInt(), "UTF-8"));
}
-
+
return sb.toString();
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
}
-
+
@Override
public boolean isMapRedTask() {
return true;
@@ -1244,8 +1249,9 @@ public class ExecDriver extends Task<Map
if (m != null) {
for (FetchWork fw: m.values()) {
String s = fw.getTblDir();
- if ((s != null) && ctx.isMRTmpFileURI(s))
+ if ((s != null) && ctx.isMRTmpFileURI(s)) {
fw.setTblDir(ctx.localizeMRTmpFileURI(s));
+ }
}
}
}
@@ -1253,27 +1259,30 @@ public class ExecDriver extends Task<Map
// fix up outputs
Map<String, ArrayList<String>> pa = work.getPathToAliases();
if (pa != null) {
- for (List<String> ls: pa.values())
+ for (List<String> ls: pa.values()) {
for (String a: ls) {
ArrayList<Operator<? extends Serializable>> opList = new
ArrayList<Operator<? extends Serializable>> ();
opList.add(work.getAliasToWork().get(a));
-
+
while (!opList.isEmpty()) {
Operator<? extends Serializable> op = opList.remove(0);
if (op instanceof FileSinkOperator) {
FileSinkDesc fdesc = ((FileSinkOperator)op).getConf();
String s = fdesc.getDirName();
- if ((s != null) && ctx.isMRTmpFileURI(s))
+ if ((s != null) && ctx.isMRTmpFileURI(s)) {
fdesc.setDirName(ctx.localizeMRTmpFileURI(s));
+ }
((FileSinkOperator)op).setConf(fdesc);
}
- if (op.getChildOperators() != null)
+ if (op.getChildOperators() != null) {
opList.addAll(op.getChildOperators());
+ }
}
}
+ }
}
}
}
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Thu Aug 5 07:36:39 2010
@@ -49,10 +49,10 @@ import org.apache.hadoop.hive.serde2.Ser
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.serde2.objectinspector.StructField;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.SubStructObjectInspector;
-import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
@@ -463,6 +463,10 @@ public class FileSinkOperator extends Te
// buckets of dynamic partitions will be created for each newly created partition
fsp.outWriters[filesIdx] = HiveFileFormatUtils.getHiveRecordWriter(
jc, conf.getTableInfo(), outputClass, conf, fsp.outPaths[filesIdx]);
+ // increment the CREATED_FILES counter
+ if (reporter != null) {
+ reporter.incrCounter(ProgressCounter.CREATED_FILES, 1);
+ }
filesIdx++;
}
assert filesIdx == numFiles;
@@ -517,9 +521,7 @@ public class FileSinkOperator extends Te
}
try {
- if (reporter != null) {
- reporter.progress();
- }
+ updateProgress();
// if DP is enabled, get the final output writers and prepare the real output row
assert inputObjInspectors[0].getCategory() == ObjectInspector.Category.STRUCT:
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Thu Aug 5 07:36:39 2010
@@ -25,8 +25,8 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Properties;
import java.util.Map.Entry;
+import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
@@ -185,7 +185,7 @@ public class MapOperator extends Operato
* Initializes this map op as the root of the tree. It sets JobConf &
* MapRedWork and starts initialization of the operator tree rooted at this
* op.
- *
+ *
* @param hconf
* @param mrwork
* @throws HiveException
@@ -292,7 +292,7 @@ public class MapOperator extends Operato
Operator<? extends Serializable> op = conf.getAliasToWork().get(
onealias);
LOG.info("Adding alias " + onealias + " to work list for file "
- + fpath.toUri().getPath());
+ + onefile);
MapInputPath inp = new MapInputPath(onefile, onealias, op);
opCtxMap.put(inp, opCtx);
if (operatorToPaths.get(op) == null) {
@@ -449,15 +449,15 @@ public class MapOperator extends Operato
try {
rawRowString = value.toString();
} catch (Exception e2) {
- rawRowString = "[Error getting row data with exception " +
+ rawRowString = "[Error getting row data with exception " +
StringUtils.stringifyException(e2) + " ]";
}
-
+
// TODO: policy on deserialization errors
deserialize_error_count.set(deserialize_error_count.get() + 1);
throw new HiveException("Hive Runtime Error while processing writable " + rawRowString, e);
}
-
+
try {
if (this.hasVC) {
forward(this.rowWithPartAndVC, this.rowObjectInspector);
@@ -478,7 +478,7 @@ public class MapOperator extends Operato
rowString = SerDeUtils.getJSONString(rowWithPart, rowObjectInspector);
}
} catch (Exception e2) {
- rowString = "[Error getting row data with exception " +
+ rowString = "[Error getting row data with exception " +
StringUtils.stringifyException(e2) + " ]";
}
throw new HiveException("Hive Runtime Error while processing row " + rowString, e);
Modified: hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=982490&r1=982489&r2=982490&view=diff
==============================================================================
--- hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Thu Aug 5 07:36:39 2010
@@ -29,7 +29,6 @@ import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.ExecMapperContext;
import org.apache.hadoop.hive.ql.lib.Node;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.Explain;
@@ -845,6 +844,7 @@ public abstract class Operator<T extends
* TODO This is a hack for hadoop 0.17 which only supports enum counters.
*/
public static enum ProgressCounter {
+ CREATED_FILES,
C1, C2, C3, C4, C5, C6, C7, C8, C9, C10,
C11, C12, C13, C14, C15, C16, C17, C18, C19, C20,
C21, C22, C23, C24, C25, C26, C27, C28, C29, C30,
Added: hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q?rev=982490&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q (added)
+++ hadoop/hive/trunk/ql/src/test/queries/clientnegative/dyn_part3.q Thu Aug 5 07:36:39 2010
@@ -0,0 +1,9 @@
+set hive.exec.max.dynamic.partitions=600;
+set hive.exec.max.dynamic.partitions.pernode=600;
+set hive.exec.dynamic.partition.mode=nonstrict;
+set hive.exec.dynamic.partition=true;
+set hive.exec.max.created.files=100;
+
+create table nzhang_part( key string) partitioned by (value string);
+
+insert overwrite table nzhang_part partition(value) select key, value from src;
Added: hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out?rev=982490&view=auto
==============================================================================
--- hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out (added)
+++ hadoop/hive/trunk/ql/src/test/results/clientnegative/dyn_part3.q.out Thu Aug 5 07:36:39 2010
@@ -0,0 +1,9 @@
+PREHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+PREHOOK: type: CREATETABLE
+POSTHOOK: query: create table nzhang_part( key string) partitioned by (value string)
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: default@nzhang_part
+PREHOOK: query: insert overwrite table nzhang_part partition(value) select key, value from src
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.MapRedTask