You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by na...@apache.org on 2014/08/01 07:32:54 UTC

svn commit: r1615036 - /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java

Author: navis
Date: Fri Aug  1 05:32:54 2014
New Revision: 1615036

URL: http://svn.apache.org/r1615036
Log:
HIVE-7562 : Cleanup ExecReducer (Brock Noland reviewed by Szehon Ho, Navis)

Modified:
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java?rev=1615036&r1=1615035&r2=1615036&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/ExecReducer.java Fri Aug  1 05:32:54 2014
@@ -39,7 +39,6 @@ import org.apache.hadoop.hive.ql.metadat
 import org.apache.hadoop.hive.ql.plan.ReduceWork;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.Deserializer;
-import org.apache.hadoop.hive.serde2.SerDe;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
@@ -56,7 +55,7 @@ import org.apache.hadoop.util.StringUtil
 /**
  * ExecReducer is the generic Reducer class for Hive. Together with ExecMapper it is
  * the bridge between the map-reduce framework and the Hive operator pipeline at
- * execution time. It's main responsabilities are:
+ * execution time. It's main responsibilities are:
  *
  * - Load and setup the operator pipeline from XML
  * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators
@@ -66,8 +65,20 @@ import org.apache.hadoop.util.StringUtil
  */
 public class ExecReducer extends MapReduceBase implements Reducer {
 
+  private static final Log LOG = LogFactory.getLog("ExecReducer");
   private static final String PLAN_KEY = "__REDUCE_PLAN__";
 
+  // used to log memory usage periodically
+  private final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
+  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+  private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+  private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
+
+  // TODO: move to DynamicSerDe when it's ready
+  private Deserializer inputKeyDeserializer;
   private JobConf jc;
   private OutputCollector<?, ?> oc;
   private Operator<?> reducer;
@@ -76,23 +87,13 @@ public class ExecReducer extends MapRedu
   private boolean isTagged = false;
   private long cntr = 0;
   private long nextCntr = 1;
-
-  public static final Log l4j = LogFactory.getLog("ExecReducer");
-  private boolean isLogInfoEnabled = false;
-
-  // used to log memory usage periodically
-  private MemoryMXBean memoryMXBean;
-
-  // TODO: move to DynamicSerDe when it's ready
-  private Deserializer inputKeyDeserializer;
-  // Input value serde needs to be an array to support different SerDe
-  // for different tags
-  private final SerDe[] inputValueDeserializer = new SerDe[Byte.MAX_VALUE];
-
-  TableDesc keyTableDesc;
-  TableDesc[] valueTableDesc;
-
-  ObjectInspector[] rowObjectInspector;
+  private TableDesc keyTableDesc;
+  private TableDesc[] valueTableDesc;
+  private ObjectInspector[] rowObjectInspector;
+
+  // runtime objects
+  private transient Object keyObject;
+  private transient BytesWritable groupKey;
 
   @Override
   public void configure(JobConf job) {
@@ -100,20 +101,16 @@ public class ExecReducer extends MapRedu
     ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
     ObjectInspector keyObjectInspector;
 
-    // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
-
-    isLogInfoEnabled = l4j.isInfoEnabled();
+    LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
 
     try {
-      l4j.info("conf classpath = "
+      LOG.info("conf classpath = "
           + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
-      l4j.info("thread classpath = "
+      LOG.info("thread classpath = "
           + Arrays.asList(((URLClassLoader) Thread.currentThread()
           .getContextClassLoader()).getURLs()));
     } catch (Exception e) {
-      l4j.info("cannot get classpath: " + e.getMessage());
+      LOG.info("cannot get classpath: " + e.getMessage());
     }
     jc = job;
 
@@ -132,7 +129,7 @@ public class ExecReducer extends MapRedu
     isTagged = gWork.getNeedsTagging();
     try {
       keyTableDesc = gWork.getKeyDesc();
-      inputKeyDeserializer = (SerDe) ReflectionUtils.newInstance(keyTableDesc
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
           .getDeserializerClass(), null);
       SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
       keyObjectInspector = inputKeyDeserializer.getObjectInspector();
@@ -140,7 +137,7 @@ public class ExecReducer extends MapRedu
       for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
         // We should initialize the SerDe with the TypeInfo when available.
         valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
-        inputValueDeserializer[tag] = (SerDe) ReflectionUtils.newInstance(
+        inputValueDeserializer[tag] = ReflectionUtils.newInstance(
             valueTableDesc[tag].getDeserializerClass(), null);
         SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
                                    valueTableDesc[tag].getProperties(), null);
@@ -162,7 +159,7 @@ public class ExecReducer extends MapRedu
 
     // initialize reduce operator tree
     try {
-      l4j.info(reducer.dump(0));
+      LOG.info(reducer.dump(0));
       reducer.initialize(jc, rowObjectInspector);
     } catch (Throwable e) {
       abort = true;
@@ -175,13 +172,6 @@ public class ExecReducer extends MapRedu
     }
   }
 
-  private Object keyObject;
-  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
-
-  private BytesWritable groupKey;
-
-  List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
-
   public void reduce(Object key, Iterator values, OutputCollector output,
       Reporter reporter) throws IOException {
     if (reducer.getDone()) {
@@ -212,7 +202,7 @@ public class ExecReducer extends MapRedu
           groupKey = new BytesWritable();
         } else {
           // If a operator wants to do some work at the end of a group
-          l4j.trace("End Group");
+          LOG.trace("End Group");
           reducer.endGroup();
         }
 
@@ -227,7 +217,7 @@ public class ExecReducer extends MapRedu
         }
 
         groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
-        l4j.trace("Start Group");
+        LOG.trace("Start Group");
         reducer.setGroupKeyObject(keyObject);
         reducer.startGroup();
       }
@@ -253,7 +243,7 @@ public class ExecReducer extends MapRedu
           cntr++;
           if (cntr == nextCntr) {
             long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecReducer: processing " + cntr
+            LOG.info("ExecReducer: processing " + cntr
                 + " rows: used memory = " + used_memory);
             nextCntr = getNextCntr(cntr);
           }
@@ -279,7 +269,7 @@ public class ExecReducer extends MapRedu
         // Don't create a new object if we are already out of memory
         throw (OutOfMemoryError) e;
       } else {
-        l4j.fatal(StringUtils.stringifyException(e));
+        LOG.fatal(StringUtils.stringifyException(e));
         throw new RuntimeException(e);
       }
     }
@@ -301,17 +291,17 @@ public class ExecReducer extends MapRedu
 
     // No row was processed
     if (oc == null) {
-      l4j.trace("Close called no row");
+      LOG.trace("Close called without any rows processed");
     }
 
     try {
       if (groupKey != null) {
         // If a operator wants to do some work at the end of a group
-        l4j.trace("End Group");
+        LOG.trace("End Group");
         reducer.endGroup();
       }
       if (isLogInfoEnabled) {
-        l4j.info("ExecReducer: processed " + cntr + " rows: used memory = "
+        LOG.info("ExecReducer: processed " + cntr + " rows: used memory = "
             + memoryMXBean.getHeapMemoryUsage().getUsed());
       }
 
@@ -322,7 +312,7 @@ public class ExecReducer extends MapRedu
     } catch (Exception e) {
       if (!abort) {
         // signal new failure to map-reduce
-        l4j.error("Hit error while closing operators - failing tree");
+        LOG.error("Hit error while closing operators - failing tree");
         throw new RuntimeException("Hive Runtime Error while closing operators: "
             + e.getMessage(), e);
       }