You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/10/18 02:40:47 UTC
svn commit: r1632713 - in /hive/trunk/ql/src:
java/org/apache/hadoop/hive/ql/exec/ java/org/apache/hadoop/hive/ql/exec/mr/
java/org/apache/hadoop/hive/ql/exec/tez/
java/org/apache/hadoop/hive/ql/exec/vector/
test/org/apache/hadoop/hive/ql/exec/
Author: gunther
Date: Sat Oct 18 00:40:46 2014
New Revision: 1632713
URL: http://svn.apache.org/r1632713
Log:
HIVE-8429: Add records in/out counters (Gunther Hagleitner, reviewed by Gopal V)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java Sat Oct 18 00:40:46 2014
@@ -109,6 +109,15 @@ public class FileSinkOperator extends Te
private StructField bucketField; // field bucket is in in record id
private StructObjectInspector recIdInspector; // OI for inspecting record id
private IntObjectInspector bucketInspector; // OI for inspecting bucket id
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+
+ /**
+ * Counters.
+ */
+ public static enum Counter {
+ RECORDS_OUT
+ }
/**
* RecordWriter.
@@ -249,7 +258,7 @@ public class FileSinkOperator extends Te
private static final long serialVersionUID = 1L;
protected transient FileSystem fs;
protected transient Serializer serializer;
- protected transient LongWritable row_count;
+ protected final transient LongWritable row_count = new LongWritable();
private transient boolean isNativeTable = true;
/**
@@ -352,7 +361,7 @@ public class FileSinkOperator extends Te
prtner = (HivePartitioner<HiveKey, Object>) ReflectionUtils.newInstance(
jc.getPartitionerClass(), null);
}
- row_count = new LongWritable();
+
if (dpCtx != null) {
dpSetup();
}
@@ -381,6 +390,13 @@ public class FileSinkOperator extends Te
bucketField = recIdInspector.getAllStructFieldRefs().get(1);
bucketInspector = (IntObjectInspector)bucketField.getFieldObjectInspector();
}
+
+ String context = jc.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_OUT + context, row_count);
+
initializeChildren(hconf);
} catch (HiveException e) {
throw e;
@@ -657,9 +673,9 @@ public class FileSinkOperator extends Te
fpaths.stat.addToStat(StatsSetupConst.ROW_COUNT, 1);
}
-
- if (row_count != null) {
- row_count.set(row_count.get() + 1);
+ if (++numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records written - " + numRows);
}
int writerOffset = findWriterOffset(row);
@@ -921,6 +937,9 @@ public class FileSinkOperator extends Te
@Override
public void closeOp(boolean abort) throws HiveException {
+ row_count.set(numRows);
+ LOG.info(toString() + ": records written - " + numRows);
+
if (!bDynParts && !filesCreated) {
createBucketFiles(fsp);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FilterOperator.java Sat Oct 18 00:40:46 2014
@@ -37,29 +37,14 @@ public class FilterOperator extends Oper
Serializable {
private static final long serialVersionUID = 1L;
-
- /**
- * Counter.
- *
- */
- public static enum Counter {
- FILTERED, PASSED
- }
-
- protected final transient LongWritable filtered_count;
- protected final transient LongWritable passed_count;
private transient ExprNodeEvaluator conditionEvaluator;
private transient PrimitiveObjectInspector conditionInspector;
- private transient int consecutiveFails;
private transient int consecutiveSearches;
private transient IOContext ioContext;
protected transient int heartbeatInterval;
public FilterOperator() {
super();
- filtered_count = new LongWritable();
- passed_count = new LongWritable();
- consecutiveFails = 0;
consecutiveSearches = 0;
}
@@ -73,8 +58,6 @@ public class FilterOperator extends Oper
conditionEvaluator = ExprNodeEvaluatorFactory.toCachedEval(conditionEvaluator);
}
- statsMap.put(Counter.FILTERED, filtered_count);
- statsMap.put(Counter.PASSED, passed_count);
conditionInspector = null;
ioContext = IOContext.get(hconf.get(Utilities.INPUT_NAME));
} catch (Throwable e) {
@@ -135,17 +118,6 @@ public class FilterOperator extends Oper
.getPrimitiveJavaObject(condition);
if (Boolean.TRUE.equals(ret)) {
forward(row, rowInspector);
- passed_count.set(passed_count.get() + 1);
- consecutiveFails = 0;
- } else {
- filtered_count.set(filtered_count.get() + 1);
- consecutiveFails++;
-
- // In case of a lot of consecutive failures, send a heartbeat in order to
- // avoid timeout
- if (((consecutiveFails % heartbeatInterval) == 0) && (reporter != null)) {
- reporter.progress();
- }
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/JoinOperator.java Sat Oct 18 00:40:46 2014
@@ -63,7 +63,7 @@ public class JoinOperator extends Common
skewJoinKeyContext.initiliaze(hconf);
skewJoinKeyContext.setSkewJoinJobCounter(skewjoin_followup_jobs);
}
- statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS, skewjoin_followup_jobs);
+ statsMap.put(SkewkeyTableCounter.SKEWJOINFOLLOWUPJOBS.toString(), skewjoin_followup_jobs);
}
@Override
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java Sat Oct 18 00:40:46 2014
@@ -75,10 +75,16 @@ public class MapOperator extends Operato
*
*/
public static enum Counter {
- DESERIALIZE_ERRORS
+ DESERIALIZE_ERRORS,
+ RECORDS_IN
}
private final transient LongWritable deserialize_error_count = new LongWritable();
+ private final transient LongWritable recordCounter = new LongWritable();
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+ protected final boolean isInfoEnabled = LOG.isInfoEnabled();
+ protected final boolean isDebugEnabled = LOG.isDebugEnabled();
private final Map<MapInputPath, MapOpCtx> opCtxMap = new HashMap<MapInputPath, MapOpCtx>();
private final Map<Operator<? extends OperatorDesc>, MapOpCtx> childrenOpToOpCtxMap =
@@ -362,7 +368,7 @@ public class MapOperator extends Operato
for (String onealias : aliases) {
Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(onealias);
- if (LOG.isDebugEnabled()) {
+ if (isDebugEnabled) {
LOG.debug("Adding alias " + onealias + " to work list for file "
+ onefile);
}
@@ -380,8 +386,10 @@ public class MapOperator extends Operato
if (!onepath.toUri().relativize(fpath.toUri()).equals(fpath.toUri())) {
children.add(op);
childrenOpToOpCtxMap.put(op, opCtx);
- LOG.info("dump " + op + " "
+ if (isInfoEnabled) {
+ LOG.info("dump " + op + " "
+ opCtxMap.get(inp).rowObjectInspector.getTypeName());
+ }
}
current = opCtx; // just need for TestOperators.testMapOperator
}
@@ -406,7 +414,13 @@ public class MapOperator extends Operato
public void initializeOp(Configuration hconf) throws HiveException {
// set that parent initialization is done and call initialize on children
state = State.INIT;
- statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
+ statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_IN + context, recordCounter);
List<Operator<? extends OperatorDesc>> children = getChildOperators();
@@ -451,6 +465,7 @@ public class MapOperator extends Operato
op.close(abort);
}
}
+ recordCounter.set(numRows);
}
// Find context for current input file
@@ -473,7 +488,9 @@ public class MapOperator extends Operato
MapOpCtx context = opCtxMap.get(inp);
if (context != null) {
current = context;
- LOG.info("Processing alias " + onealias + " for file " + onefile);
+ if (isInfoEnabled) {
+ LOG.info("Processing alias " + onealias + " for file " + onefile);
+ }
return;
}
}
@@ -533,6 +550,13 @@ public class MapOperator extends Operato
// The row has been converted to comply with table schema, irrespective of partition schema.
// So, use tblOI (and not partOI) for forwarding
try {
+ numRows++;
+ if (isInfoEnabled) {
+ if (numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records read - " + numRows);
+ }
+ }
forward(row, current.rowObjectInspector);
} catch (Exception e) {
// Serialize the row and output the error message.
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java Sat Oct 18 00:40:46 2014
@@ -61,6 +61,7 @@ public abstract class Operator<T extends
public static final String HIVECOUNTERCREATEDFILES = "CREATED_FILES";
public static final String HIVECOUNTERFATAL = "FATAL_ERROR";
+ public static final String CONTEXT_NAME_KEY = "__hive.context.name";
private transient Configuration configuration;
protected List<Operator<? extends OperatorDesc>> childOperators;
@@ -210,7 +211,7 @@ public abstract class Operator<T extends
// non-bean ..
- protected transient HashMap<Enum<?>, LongWritable> statsMap = new HashMap<Enum<?>, LongWritable>();
+ protected transient Map<String, LongWritable> statsMap = new HashMap<String, LongWritable>();
@SuppressWarnings("rawtypes")
protected transient OutputCollector out;
protected transient Log LOG = LogFactory.getLog(this.getClass().getName());
@@ -287,9 +288,9 @@ public abstract class Operator<T extends
}
}
- public Map<Enum<?>, Long> getStats() {
- HashMap<Enum<?>, Long> ret = new HashMap<Enum<?>, Long>();
- for (Enum<?> one : statsMap.keySet()) {
+ public Map<String, Long> getStats() {
+ HashMap<String, Long> ret = new HashMap<String, Long>();
+ for (String one : statsMap.keySet()) {
ret.put(one, Long.valueOf(statsMap.get(one).get()));
}
return (ret);
@@ -807,7 +808,7 @@ public abstract class Operator<T extends
}
public void resetStats() {
- for (Enum<?> e : statsMap.keySet()) {
+ for (String e : statsMap.keySet()) {
statsMap.get(e).set(0L);
}
}
@@ -840,7 +841,7 @@ public abstract class Operator<T extends
}
public void logStats() {
- for (Enum<?> e : statsMap.keySet()) {
+ for (String e : statsMap.keySet()) {
LOG.info(e.toString() + ":" + statsMap.get(e).toString());
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ReduceSinkOperator.java Sat Oct 18 00:40:46 2014
@@ -51,6 +51,7 @@ import org.apache.hadoop.hive.serde2.obj
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.util.hash.MurmurHash;
@@ -67,6 +68,13 @@ public class ReduceSinkOperator extends
PTFUtils.makeTransient(ReduceSinkOperator.class, "inputAliases", "valueIndex");
}
+ /**
+ * Counters.
+ */
+ public static enum Counter {
+ RECORDS_OUT_INTERMEDIATE
+ }
+
private static final Log LOG = LogFactory.getLog(ReduceSinkOperator.class.getName());
private static final boolean isInfoEnabled = LOG.isInfoEnabled();
private static final boolean isDebugEnabled = LOG.isDebugEnabled();
@@ -146,9 +154,19 @@ public class ReduceSinkOperator extends
private StructObjectInspector recIdInspector; // OI for the record identifier
private IntObjectInspector bucketInspector; // OI for the bucket field in the record id
+ protected transient long numRows = 0;
+ protected transient long cntr = 1;
+ private final transient LongWritable recordCounter = new LongWritable();
+
@Override
protected void initializeOp(Configuration hconf) throws HiveException {
try {
+ String context = hconf.get(Operator.CONTEXT_NAME_KEY, "");
+ if (context != null && !context.isEmpty()) {
+ context = "_" + context.replace(" ","_");
+ }
+ statsMap.put(Counter.RECORDS_OUT_INTERMEDIATE + context, recordCounter);
+
List<ExprNodeDesc> keys = conf.getKeyCols();
if (isDebugEnabled) {
@@ -508,6 +526,13 @@ public class ReduceSinkOperator extends
// Since this is a terminal operator, update counters explicitly -
// forward is not called
if (null != out) {
+ numRows++;
+ if (isInfoEnabled) {
+ if (numRows == cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ }
out.collect(keyWritable, valueWritable);
}
}
@@ -537,6 +562,10 @@ public class ReduceSinkOperator extends
}
super.closeOp(abort);
out = null;
+ if (isInfoEnabled) {
+ LOG.info(toString() + ": records written - " + numRows);
+ }
+ recordCounter.set(numRows);
}
/**
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ScriptOperator.java Sat Oct 18 00:40:46 2014
@@ -236,8 +236,8 @@ public class ScriptOperator extends Oper
protected void initializeOp(Configuration hconf) throws HiveException {
firstRow = true;
- statsMap.put(Counter.DESERIALIZE_ERRORS, deserialize_error_count);
- statsMap.put(Counter.SERIALIZE_ERRORS, serialize_error_count);
+ statsMap.put(Counter.DESERIALIZE_ERRORS.toString(), deserialize_error_count);
+ statsMap.put(Counter.SERIALIZE_ERRORS.toString(), serialize_error_count);
try {
this.hconf = hconf;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecMapper.java Sat Oct 18 00:40:46 2014
@@ -26,8 +26,11 @@ import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.conf.Configuration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.exec.FetchOperator;
import org.apache.hadoop.hive.ql.exec.MapOperator;
import org.apache.hadoop.hive.ql.exec.MapredContext;
@@ -72,9 +75,6 @@ public class ExecMapper extends MapReduc
private static boolean done;
// used to log memory usage periodically
- public static MemoryMXBean memoryMXBean;
- private long numRows = 0;
- private long nextCntr = 1;
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
@@ -84,8 +84,6 @@ public class ExecMapper extends MapReduc
public void configure(JobConf job) {
execContext = new ExecMapperContext(job);
// Allocate the bean at the beginning -
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
isLogInfoEnabled = l4j.isInfoEnabled();
@@ -176,15 +174,6 @@ public class ExecMapper extends MapReduc
// Since there is no concept of a group, we don't invoke
// startGroup/endGroup for a mapper
mo.process((Writable)value);
- if (isLogInfoEnabled) {
- numRows++;
- if (numRows == nextCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processing " + numRows
- + " rows: used memory = " + used_memory);
- nextCntr = getNextCntr(numRows);
- }
- }
}
} catch (Throwable e) {
abort = true;
@@ -198,18 +187,6 @@ public class ExecMapper extends MapReduc
}
}
-
- private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
- }
-
@Override
public void close() {
// No row was processed
@@ -245,13 +222,7 @@ public class ExecMapper extends MapReduc
}
}
- if (isLogInfoEnabled) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
- + used_memory);
- }
-
- ReportStats rps = new ReportStats(rp);
+ ReportStats rps = new ReportStats(rp, jc);
mo.preorderMap(rps);
return;
} catch (Exception e) {
@@ -288,17 +259,21 @@ public class ExecMapper extends MapReduc
*/
public static class ReportStats implements Operator.OperatorFunc {
private final Reporter rp;
+ private final Configuration conf;
+ private final String groupName;
- public ReportStats(Reporter rp) {
+ public ReportStats(Reporter rp, Configuration conf) {
this.rp = rp;
+ this.conf = conf;
+ this.groupName = HiveConf.getVar(conf, HiveConf.ConfVars.HIVECOUNTERGROUP);
}
@Override
public void func(Operator op) {
- Map<Enum<?>, Long> opStats = op.getStats();
- for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
+ Map<String, Long> opStats = op.getStats();
+ for (Map.Entry<String, Long> e : opStats.entrySet()) {
if (rp != null) {
- rp.incrCounter(e.getKey(), e.getValue());
+ rp.incrCounter(groupName, e.getKey(), e.getValue());
}
}
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Sat Oct 18 00:40:46 2014
@@ -70,8 +70,6 @@ public class ExecReducer extends MapRedu
private static final boolean isTraceEnabled = LOG.isTraceEnabled();
private static final String PLAN_KEY = "__REDUCE_PLAN__";
- // used to log memory usage periodically
- private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
// Input value serde needs to be an array to support different SerDe
// for different tags
private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
@@ -86,8 +84,6 @@ public class ExecReducer extends MapRedu
private Reporter rp;
private boolean abort = false;
private boolean isTagged = false;
- private long cntr = 0;
- private long nextCntr = 1;
private TableDesc keyTableDesc;
private TableDesc[] valueTableDesc;
private ObjectInspector[] rowObjectInspector;
@@ -103,8 +99,6 @@ public class ExecReducer extends MapRedu
ObjectInspector keyObjectInspector;
if (isInfoEnabled) {
- LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
try {
LOG.info("conf classpath = "
+ Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
@@ -245,17 +239,7 @@ public class ExecReducer extends MapRedu
row.clear();
row.add(keyObject);
row.add(valueObject[tag]);
- if (isInfoEnabled) {
- cntr++;
- if (cntr == nextCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- if (isInfoEnabled) {
- LOG.info("ExecReducer: processing " + cntr
- + " rows: used memory = " + used_memory);
- }
- nextCntr = getNextCntr(cntr);
- }
- }
+
try {
reducer.processOp(row, tag);
} catch (Exception e) {
@@ -283,17 +267,6 @@ public class ExecReducer extends MapRedu
}
}
- private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
- }
-
@Override
public void close() {
@@ -310,13 +283,9 @@ public class ExecReducer extends MapRedu
}
reducer.endGroup();
}
- if (isInfoEnabled) {
- LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
- + memoryMXBean.getHeapMemoryUsage().getUsed());
- }
reducer.close(abort);
- ReportStats rps = new ReportStats(rp);
+ ReportStats rps = new ReportStats(rp, jc);
reducer.preorderMap(rps);
} catch (Exception e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Sat Oct 18 00:40:46 2014
@@ -50,6 +50,7 @@ import org.apache.hadoop.hive.conf.HiveC
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.exec.CommonMergeJoinOperator;
+import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
@@ -178,6 +179,8 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context, MapWork mapWork) {
JobConf conf = new JobConf(baseConf);
+ conf.set(Operator.CONTEXT_NAME_KEY, mapWork.getName());
+
if (mapWork.getNumMapTasks() != null) {
// Is this required ?
conf.setInt(MRJobConfig.NUM_MAPS, mapWork.getNumMapTasks().intValue());
@@ -653,6 +656,8 @@ public class DagUtils {
private JobConf initializeVertexConf(JobConf baseConf, Context context, ReduceWork reduceWork) {
JobConf conf = new JobConf(baseConf);
+ conf.set(Operator.CONTEXT_NAME_KEY, reduceWork.getName());
+
// Is this required ?
conf.set("mapred.reducer.class", ExecReducer.class.getName());
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MapRecordProcessor.java Sat Oct 18 00:40:46 2014
@@ -269,11 +269,7 @@ public class MapRecordProcessor extends
@Override
void run() throws Exception {
- while (sources[position].pushRecord()) {
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
+ while (sources[position].pushRecord()) {}
}
@Override
@@ -305,10 +301,7 @@ public class MapRecordProcessor extends
}
}
- if (isLogInfoEnabled) {
- logCloseInfo();
- }
- ReportStats rps = new ReportStats(reporter);
+ ReportStats rps = new ReportStats(reporter, jconf);
mapOp.preorderMap(rps);
return;
} catch (Exception e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java Sat Oct 18 00:40:46 2014
@@ -156,10 +156,7 @@ public class MergeFileRecordProcessor ex
}
mergeOp.close(abort);
- if (isLogInfoEnabled) {
- logCloseInfo();
- }
- ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter);
+ ExecMapper.ReportStats rps = new ExecMapper.ReportStats(reporter, jconf);
mergeOp.preorderMap(rps);
} catch (Exception e) {
if (!abort) {
@@ -190,9 +187,6 @@ public class MergeFileRecordProcessor ex
row[0] = key;
row[1] = value;
mergeOp.processOp(row, 0);
- if (isLogInfoEnabled) {
- logProgress();
- }
}
} catch (Throwable e) {
abort = true;
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/RecordProcessor.java Sat Oct 18 00:40:46 2014
@@ -52,13 +52,10 @@ public abstract class RecordProcessor {
// used to log memory usage periodically
- public static MemoryMXBean memoryMXBean;
protected boolean isLogInfoEnabled = false;
protected boolean isLogTraceEnabled = false;
protected MRTaskReporter reporter;
- private long numRows = 0;
- private long nextUpdateCntr = 1;
protected PerfLogger perfLogger = PerfLogger.getPerfLogger();
protected String CLASS_NAME = RecordProcessor.class.getName();
@@ -79,11 +76,6 @@ public abstract class RecordProcessor {
this.outputs = outputs;
this.processorContext = processorContext;
- // Allocate the bean at the beginning -
- memoryMXBean = ManagementFactory.getMemoryMXBean();
-
- l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
isLogInfoEnabled = l4j.isInfoEnabled();
isLogTraceEnabled = l4j.isTraceEnabled();
@@ -110,37 +102,6 @@ public abstract class RecordProcessor {
abstract void close();
- /**
- * Log information to be logged at the end
- */
- protected void logCloseInfo() {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("TezProcessor: processed " + numRows + " rows/groups: used memory = " + used_memory);
- }
-
- /**
- * Log number of records processed and memory used after processing many records
- */
- protected void logProgress() {
- numRows++;
- if (numRows == nextUpdateCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("TezProcessor: processing " + numRows + " rows/groups: used memory = " + used_memory);
- nextUpdateCntr = getNextUpdateRecordCounter(numRows);
- }
- }
-
- private long getNextUpdateRecordCounter(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
- }
-
protected void createOutputMap() {
Preconditions.checkState(outMap == null, "Outputs should only be setup once");
outMap = Maps.newHashMap();
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Sat Oct 18 00:40:46 2014
@@ -165,11 +165,7 @@ public class ReduceRecordProcessor exte
}
// run the operator pipeline
- while (sources[position].pushRecord()) {
- if (isLogInfoEnabled) {
- logProgress();
- }
- }
+ while (sources[position].pushRecord()) {}
}
/**
@@ -208,7 +204,7 @@ public class ReduceRecordProcessor exte
dummyOp.close(abort);
}
}
- ReportStats rps = new ReportStats(reporter);
+ ReportStats rps = new ReportStats(reporter, jconf);
reducer.preorderMap(rps);
} catch (Exception e) {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorFilterOperator.java Sat Oct 18 00:40:46 2014
@@ -60,8 +60,6 @@ public class VectorFilterOperator extend
try {
heartbeatInterval = HiveConf.getIntVar(hconf,
HiveConf.ConfVars.HIVESENDHEARTBEAT);
- statsMap.put(Counter.FILTERED, filtered_count);
- statsMap.put(Counter.PASSED, passed_count);
} catch (Throwable e) {
throw new HiveException(e);
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapOperator.java Sat Oct 18 00:40:46 2014
@@ -40,6 +40,13 @@ public class VectorMapOperator extends M
// The row has been converted to comply with table schema, irrespective of partition schema.
// So, use tblOI (and not partOI) for forwarding
try {
+ if (isInfoEnabled) {
+ numRows += ((VectorizedRowBatch)value).size;
+ while (numRows > cntr) {
+ cntr *= 10;
+ LOG.info(toString() + ": records read - " + numRows);
+ }
+ }
forward(value, current.getRowObjectInspector());
} catch (Exception e) {
throw new HiveException("Hive Runtime Error while processing row ", e);
Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java?rev=1632713&r1=1632712&r2=1632713&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/exec/TestOperators.java Sat Oct 18 00:40:46 2014
@@ -109,55 +109,6 @@ public class TestOperators extends TestC
}
}
- public void testBaseFilterOperator() throws Throwable {
- try {
- System.out.println("Testing Filter Operator");
- ExprNodeDesc col0 = TestExecDriver.getStringColumn("col0");
- ExprNodeDesc col1 = TestExecDriver.getStringColumn("col1");
- ExprNodeDesc col2 = TestExecDriver.getStringColumn("col2");
- ExprNodeDesc zero = new ExprNodeConstantDesc("0");
- ExprNodeDesc func1 = TypeCheckProcFactory.DefaultExprProcessor
- .getFuncExprNodeDesc(">", col2, col1);
- ExprNodeDesc func2 = TypeCheckProcFactory.DefaultExprProcessor
- .getFuncExprNodeDesc("==", col0, zero);
- ExprNodeDesc func3 = TypeCheckProcFactory.DefaultExprProcessor
- .getFuncExprNodeDesc("and", func1, func2);
- assert (func3 != null);
- FilterDesc filterCtx = new FilterDesc(func3, false);
-
- // Configuration
- Operator<FilterDesc> op = OperatorFactory.get(FilterDesc.class);
- op.setConf(filterCtx);
-
- // runtime initialization
- op.initialize(new JobConf(TestOperators.class),
- new ObjectInspector[] {r[0].oi});
-
- for (InspectableObject oner : r) {
- op.processOp(oner.o, 0);
- }
-
- Map<Enum<?>, Long> results = op.getStats();
- System.out.println("filtered = "
- + results.get(FilterOperator.Counter.FILTERED));
- assertEquals(Long.valueOf(4), results
- .get(FilterOperator.Counter.FILTERED));
- System.out.println("passed = "
- + results.get(FilterOperator.Counter.PASSED));
- assertEquals(Long.valueOf(1), results.get(FilterOperator.Counter.PASSED));
-
- /*
- * for(Enum e: results.keySet()) { System.out.println(e.toString() + ":" +
- * results.get(e)); }
- */
- System.out.println("Filter Operator ok");
-
- } catch (Throwable e) {
- e.printStackTrace();
- throw e;
- }
- }
-
private void testTaskIds(String [] taskIds, String expectedAttemptId, String expectedTaskId) {
Configuration conf = new JobConf(TestOperators.class);
for (String one: taskIds) {