You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/08/01 07:32:54 UTC
svn commit: r1615036 -
/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
Author: navis
Date: Fri Aug 1 05:32:54 2014
New Revision: 1615036
URL: http://svn.apache.org/r1615036
Log:
HIVE-7562 : Cleanup ExecReducer (Brock Noland reviewed by Szehon Ho, Navis)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615036&r1=1615035&r2=1615036&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Fri Aug 1 05:32:54 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.metadat
import org.apache.hadoop.hive.ql.plan.ReduceWork;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeUtils;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -56,7 +55,7 @@ import org.apache.hadoop.util.StringUtil
/**
* ExecReducer is the generic Reducer class for Hive. Together with ExecMapper it is
* the bridge between the map-reduce framework and the Hive operator pipeline at
- * execution time. It's main responsabilities are:
+ * execution time. It's main responsibilities are:
*
* - Load and setup the operator pipeline from XML
* - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators
@@ -66,8 +65,20 @@ import org.apache.hadoop.util.StringUtil
*/
public class ExecReducer extends MapReduceBase implements Reducer {
+ private static final Log LOG = LogFactory.getLog("ExecReducer");
private static final String PLAN_KEY = "__REDUCE_PLAN__";
+ // used to log memory usage periodically
+ private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
+ private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+ private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+ private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
+
+ // TODO: move to DynamicSerDe when it's ready
+ private Deserializer inputKeyDeserializer;
private JobConf jc;
private OutputCollector<?, ?> oc;
private Operator<?> reducer;
@@ -76,23 +87,13 @@ public class ExecReducer extends MapRedu
private boolean isTagged = false;
private long cntr = 0;
private long nextCntr = 1;
-
- public static final Log l4j = LogFactory.getLog("ExecReducer");
- private boolean isLogInfoEnabled = false;
-
- // used to log memory usage periodically
- private MemoryMXBean memoryMXBean;
-
- // TODO: move to DynamicSerDe when it's ready
- private Deserializer inputKeyDeserializer;
- // Input value serde needs to be an array to support different SerDe
- // for different tags
- private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
-
- TableDesc keyTableDesc;
- TableDesc[] valueTableDesc;
-
- ObjectInspector[] rowObjectInspector;
+ private TableDesc keyTableDesc;
+ private TableDesc[] valueTableDesc;
+ private ObjectInspector[] rowObjectInspector;
+
+ // runtime objects
+ private transient Object keyObject;
+ private transient BytesWritable groupKey;
@Override
public void configure(JobConf job) {
@@ -100,20 +101,16 @@ public class ExecReducer extends MapRedu
ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
ObjectInspector keyObjectInspector;
- // Allocate the bean at the beginning -
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
- isLogInfoEnabled = l4j.isInfoEnabled();
+ LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
try {
- l4j.info("conf classpath = "
+ LOG.info("conf classpath = "
+ Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
- l4j.info("thread classpath = "
+ LOG.info("thread classpath = "
+ Arrays.asList(((URLClassLoader) Thread.currentThread()
.getContextClassLoader()).getURLs()));
} catch (Exception e) {
- l4j.info("cannot get classpath: " + e.getMessage());
+ LOG.info("cannot get classpath: " + e.getMessage());
}
jc = job;
@@ -132,7 +129,7 @@ public class ExecReducer extends MapRedu
isTagged = gWork.getNeedsTagging();
try {
keyTableDesc = gWork.getKeyDesc();
- inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
.getDeserializerClass(), null);
SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -140,7 +137,7 @@ public class ExecReducer extends MapRedu
for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
// We should initialize the SerDe with the TypeInfo when available.
valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
- inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+ inputValueDeserializer[tag] = ReflectionUtils.newInstance(
valueTableDesc[tag].getDeserializerClass(), null);
SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
valueTableDesc[tag].getProperties(), null);
@@ -162,7 +159,7 @@ public class ExecReducer extends MapRedu
// initialize reduce operator tree
try {
- l4j.info(reducer.dump(0));
+ LOG.info(reducer.dump(0));
reducer.initialize(jc, rowObjectInspector);
} catch (Throwable e) {
abort = true;
@@ -175,13 +172,6 @@ public class ExecReducer extends MapRedu
}
}
- private Object keyObject;
- private final Object[] valueObject = new Object[Byte.MAX_VALUE];
-
- private BytesWritable groupKey;
-
- List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
-
public void reduce(Object key, Iterator values, OutputCollector output,
Reporter reporter) throws IOException {
if (reducer.getDone()) {
@@ -212,7 +202,7 @@ public class ExecReducer extends MapRedu
groupKey = new BytesWritable();
} else {
// If a operator wants to do some work at the end of a group
- l4j.trace("End Group");
+ LOG.trace("End Group");
reducer.endGroup();
}
@@ -227,7 +217,7 @@ public class ExecReducer extends MapRedu
}
groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
- l4j.trace("Start Group");
+ LOG.trace("Start Group");
reducer.setGroupKeyObject(keyObject);
reducer.startGroup();
}
@@ -253,7 +243,7 @@ public class ExecReducer extends MapRedu
cntr++;
if (cntr == nextCntr) {
long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecReducer: processing " + cntr
+ LOG.info("ExecReducer: processing " + cntr
+ " rows: used memory = " + used_memory);
nextCntr = getNextCntr(cntr);
}
@@ -279,7 +269,7 @@ public class ExecReducer extends MapRedu
// Don't create a new object if we are already out of memory
throw (OutOfMemoryError) e;
} else {
- l4j.fatal(StringUtils.stringifyException(e));
+ LOG.fatal(StringUtils.stringifyException(e));
throw new RuntimeException(e);
}
}
@@ -301,17 +291,17 @@ public class ExecReducer extends MapRedu
// No row was processed
if (oc == null) {
- l4j.trace("Close called no row");
+ LOG.trace("Close called without any rows processed");
}
try {
if (groupKey != null) {
// If a operator wants to do some work at the end of a group
- l4j.trace("End Group");
+ LOG.trace("End Group");
reducer.endGroup();
}
if (isLogInfoEnabled) {
- l4j.info("ExecReducer: processed " + cntr + " rows: used memory = "
+ LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
+ memoryMXBean.getHeapMemoryUsage().getUsed());
}
@@ -322,7 +312,7 @@ public class ExecReducer extends MapRedu
} catch (Exception e) {
if (!abort) {
// signal new failure to map-reduce
- l4j.error("Hit error while closing operators - failing tree");
+ LOG.error("Hit error while closing operators - failing tree");
throw new RuntimeException("Hive Runtime Error while closing operators: "
+ e.getMessage(), e);
}