You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/15 06:28:01 UTC

svn commit: r1618094 - /hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/

Author: brock
Date: Fri Aug 15 04:28:01 2014
New Revision: 1618094

URL: http://svn.apache.org/r1618094
Log:
HIVE-7677 - Implement native HiveReduceFunction (Chengxiang Li via Brock) [Spark Branch]

Added:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
Modified:
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
    hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Fri Aug 15 04:28:01 2014
@@ -44,7 +44,7 @@ public class HiveMapFunctionResultList e
   @Override
   protected void processNextRecord(Tuple2<BytesWritable, BytesWritable> inputRecord)
       throws IOException {
-    recordHandler.process(inputRecord._2());
+    recordHandler.processRow(inputRecord._2());
   }
 
   @Override

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Fri Aug 15 04:28:01 2014
@@ -48,8 +48,10 @@ BytesWritable, BytesWritable> {
       jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
     }
 
-    ExecReducer reducer = new ExecReducer();
-    reducer.configure(jobConf);
-    return new HiveReduceFunctionResultList(jobConf, it, reducer);
+    SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
+    HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler);
+    reducerRecordhandler.init(jobConf, result, Reporter.NULL);
+
+    return result;
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Fri Aug 15 04:28:01 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.s
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
 import org.apache.hadoop.hive.ql.plan.OperatorDesc;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.Reporter;
@@ -32,7 +31,7 @@ import java.util.Iterator;
 
 public class HiveReduceFunctionResultList extends
     HiveBaseFunctionResultList<Tuple2<BytesWritable, Iterable<BytesWritable>>> {
-  private final ExecReducer reducer;
+  private final SparkReduceRecordHandler reduceRecordHandler;
 
   /**
    * Instantiate result set Iterable for Reduce function output.
@@ -42,16 +41,16 @@ public class HiveReduceFunctionResultLis
    */
   public HiveReduceFunctionResultList(Configuration conf,
       Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> inputIterator,
-      ExecReducer reducer) {
+    SparkReduceRecordHandler reducer) {
     super(conf, inputIterator);
-    this.reducer = reducer;
+    this.reduceRecordHandler = reducer;
     setOutputCollector();
   }
 
   @Override
   protected void processNextRecord(Tuple2<BytesWritable, Iterable<BytesWritable>> inputRecord)
       throws IOException {
-    reducer.reduce(inputRecord._1(), inputRecord._2().iterator(), this, Reporter.NULL);
+    reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator());
   }
 
   @Override
@@ -61,13 +60,13 @@ public class HiveReduceFunctionResultLis
 
   @Override
   protected void closeRecordProcessor() {
-    reducer.close();
+    reduceRecordHandler.close();
   }
 
   private void setOutputCollector() {
-    if (reducer != null && reducer.getReducer() != null) {
+    if (reduceRecordHandler != null && reduceRecordHandler.getReducer() != null) {
       OperatorUtils.setChildrenCollector(
-          Arrays.<Operator<? extends OperatorDesc>>asList(reducer.getReducer()), this);
+          Arrays.<Operator<? extends OperatorDesc>>asList(reduceRecordHandler.getReducer()), this);
     }
   }
 }

Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
 import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.OperatorUtils;
 import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
 import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
 import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
 import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -39,12 +40,8 @@ import org.apache.hadoop.mapred.Reporter
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
+import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
 
 
 /**
@@ -57,45 +54,22 @@ import java.util.Map;
  * - Catch and handle errors during execution of the operators.
  *
  */
-public class SparkMapRecordHandler {
+public class SparkMapRecordHandler extends SparkRecordHandler{
 
   private static final String PLAN_KEY = "__MAP_PLAN__";
   private MapOperator mo;
-  private OutputCollector oc;
-  private JobConf jc;
-  private boolean abort = false;
-  private Reporter rp;
   public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
   private boolean done;
 
-  // used to log memory usage periodically
-  public static MemoryMXBean memoryMXBean;
-  private long numRows = 0;
-  private long nextCntr = 1;
   private MapredLocalWork localWork = null;
   private boolean isLogInfoEnabled = false;
 
   private final ExecMapperContext execContext = new ExecMapperContext();
 
   public void init(JobConf job, OutputCollector output, Reporter reporter) {
-    // Allocate the bean at the beginning -
-    memoryMXBean = ManagementFactory.getMemoryMXBean();
-    l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+    super.init(job, output, reporter);
 
     isLogInfoEnabled = l4j.isInfoEnabled();
-
-    try {
-      l4j.info("conf classpath = "
-        + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
-      l4j.info("thread classpath = "
-        + Arrays.asList(((URLClassLoader) Thread.currentThread()
-        .getContextClassLoader()).getURLs()));
-    } catch (Exception e) {
-      l4j.info("cannot get classpath: " + e.getMessage());
-    }
-
-    setDone(false);
-
     ObjectCache cache = ObjectCacheFactory.getCache(job);
 
     try {
@@ -128,11 +102,8 @@ public class SparkMapRecordHandler {
       mo.initializeLocalWork(jc);
       mo.initialize(jc, null);
 
-      oc = output;
-      rp = reporter;
       OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
       mo.setReporter(rp);
-      MapredContext.get().setReporter(reporter);
 
       if (localWork == null) {
         return;
@@ -158,26 +129,17 @@ public class SparkMapRecordHandler {
     }
   }
 
-  public void process(Object value) throws IOException {
+  @Override
+  public void processRow(Object value) throws IOException {
     // reset the execContext for each new row
     execContext.resetRow();
 
     try {
-      if (mo.getDone()) {
-        done = true;
-      } else {
-        // Since there is no concept of a group, we don't invoke
-        // startGroup/endGroup for a mapper
-        mo.process((Writable)value);
-        if (isLogInfoEnabled) {
-          numRows++;
-          if (numRows == nextCntr) {
-            long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-            l4j.info("ExecMapper: processing " + numRows
-              + " rows: used memory = " + used_memory);
-            nextCntr = getNextCntr(numRows);
-          }
-        }
+      // Since there is no concept of a group, we don't invoke
+      // startGroup/endGroup for a mapper
+      mo.process((Writable) value);
+      if (isLogInfoEnabled) {
+        logMemoryInfo();
       }
     } catch (Throwable e) {
       abort = true;
@@ -191,16 +153,9 @@ public class SparkMapRecordHandler {
     }
   }
 
-
-  private long getNextCntr(long cntr) {
-    // A very simple counter to keep track of number of rows processed by the
-    // reducer. It dumps
-    // every 1 million times, and quickly before that
-    if (cntr >= 1000000) {
-      return cntr + 1000000;
-    }
-
-    return 10 * cntr;
+  @Override
+  public void processRow(Object key, Iterator values) throws IOException {
+    throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler.");
   }
 
   public void close() {
@@ -229,9 +184,7 @@ public class SparkMapRecordHandler {
       }
 
       if (isLogInfoEnabled) {
-        long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
-        l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
-          + used_memory);
+        logCloseInfo();
       }
 
       ReportStats rps = new ReportStats(rp);
@@ -250,39 +203,6 @@ public class SparkMapRecordHandler {
   }
 
   public  boolean getDone() {
-    return done;
-  }
-
-  public boolean isAbort() {
-    return abort;
-  }
-
-  public void setAbort(boolean abort) {
-    this.abort = abort;
-  }
-
-  public void setDone(boolean done) {
-    this.done = done;
-  }
-
-  /**
-   * reportStats.
-   *
-   */
-  public static class ReportStats implements Operator.OperatorFunc {
-    private final Reporter rp;
-
-    public ReportStats(Reporter rp) {
-      this.rp = rp;
-    }
-
-    public void func(Operator op) {
-      Map<Enum<?>, Long> opStats = op.getStats();
-      for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
-        if (rp != null) {
-          rp.incrCounter(e.getKey(), e.getValue());
-        }
-      }
-    }
+    return mo.getDone();
   }
 }

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1618094&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public abstract class SparkRecordHandler {
+  private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class);
+
+  // used to log memory usage periodically
+  protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+  protected JobConf jc;
+  protected OutputCollector<?, ?> oc;
+  protected Reporter rp;
+  protected boolean abort = false;
+  private long rowNumber = 0;
+  private long nextLogThreshold = 1;
+
+  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+    jc = job;
+    MapredContext.init(false, new JobConf(jc));
+
+    oc = output;
+    rp = reporter;
+    MapredContext.get().setReporter(reporter);
+
+    LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+
+    try {
+      LOG.info("conf classpath = "
+        + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+      LOG.info("thread classpath = "
+        + Arrays.asList(((URLClassLoader) Thread.currentThread()
+        .getContextClassLoader()).getURLs()));
+    } catch (Exception e) {
+      LOG.info("cannot get classpath: " + e.getMessage());
+    }
+  }
+
+  /**
+   * Process row with single value.
+   */
+  public abstract void processRow(Object value) throws IOException;
+
+  /**
+   * Process row with key and value collection.
+   */
+  public abstract void processRow(Object key, Iterator values) throws IOException;
+
+  /**
+   * Log processed row number and used memory info.
+   */
+  protected void logMemoryInfo() {
+    rowNumber++;
+    if (rowNumber == nextLogThreshold) {
+      long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+      LOG.info("ExecReducer: processing " + rowNumber
+        + " rows: used memory = " + used_memory);
+      nextLogThreshold = getNextLogThreshold(rowNumber);
+    }
+  }
+
+  abstract void close();
+
+  /**
+   * Log information to be logged at the end
+   */
+  protected void logCloseInfo() {
+    long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+    LOG.info("ExecMapper: processed " + rowNumber + " rows: used memory = "
+      + used_memory);
+  }
+
+  private long getNextLogThreshold(long currentThreshold) {
+    // A very simple counter to keep track of number of rows processed by the
+    // reducer. It dumps
+    // every 1 million times, and quickly before that
+    if (currentThreshold >= 1000000) {
+      return currentThreshold + 1000000;
+    }
+
+    return 10 * currentThreshold;
+  }
+
+  public boolean isAbort() {
+    return abort;
+  }
+
+  public void setAbort(boolean abort) {
+    this.abort = abort;
+  }
+}

Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1618094&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Clone from ExecReducer, it is the bridge between the spark framework and
+ * the Hive operator pipeline at execution time. It's main responsibilities are:
+ *
+ * - Load and setup the operator pipeline from XML
+ * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators
+ * - Sending start and end group messages to separate records with same key from one another
+ * - Catch and handle errors during execution of the operators.
+ *
+ */
+public class SparkReduceRecordHandler extends SparkRecordHandler{
+
+  private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class);
+  private static final String PLAN_KEY = "__REDUCE_PLAN__";
+
+  // Input value serde needs to be an array to support different SerDe
+  // for different tags
+  private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
+  private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+  private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+  private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
+
+  // TODO: move to DynamicSerDe when it's ready
+  private Deserializer inputKeyDeserializer;
+  private Operator<?> reducer;
+  private boolean isTagged = false;
+  private TableDesc keyTableDesc;
+  private TableDesc[] valueTableDesc;
+  private ObjectInspector[] rowObjectInspector;
+
+  // runtime objects
+  private transient Object keyObject;
+  private transient BytesWritable groupKey;
+
+  public void init(JobConf job, OutputCollector output, Reporter reporter) {
+    super.init(job, output, reporter);
+
+    rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+    ObjectInspector keyObjectInspector;
+
+    ObjectCache cache = ObjectCacheFactory.getCache(jc);
+    ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY);
+    if (gWork == null) {
+      gWork = Utilities.getReduceWork(job);
+      cache.cache(PLAN_KEY, gWork);
+    } else {
+      Utilities.setReduceWork(job, gWork);
+    }
+
+    reducer = gWork.getReducer();
+    reducer.setParentOperators(null); // clear out any parents as reducer is the
+    // root
+    isTagged = gWork.getNeedsTagging();
+    try {
+      keyTableDesc = gWork.getKeyDesc();
+      inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+        .getDeserializerClass(), null);
+      SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
+      keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+      valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()];
+      for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
+        // We should initialize the SerDe with the TypeInfo when available.
+        valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
+        inputValueDeserializer[tag] = ReflectionUtils.newInstance(
+          valueTableDesc[tag].getDeserializerClass(), null);
+        SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
+          valueTableDesc[tag].getProperties(), null);
+        valueObjectInspector[tag] = inputValueDeserializer[tag]
+          .getObjectInspector();
+
+        ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+        ois.add(keyObjectInspector);
+        ois.add(valueObjectInspector[tag]);
+        reducer.setGroupKeyObjectInspector(keyObjectInspector);
+        rowObjectInspector[tag] = ObjectInspectorFactory
+          .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
+      }
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+
+    reducer.setReporter(rp);
+
+    // initialize reduce operator tree
+    try {
+      LOG.info(reducer.dump(0));
+      reducer.initialize(jc, rowObjectInspector);
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        throw new RuntimeException("Reduce operator initialization failed", e);
+      }
+    }
+  }
+
+  @Override
+  public void processRow(Object value) throws IOException {
+    throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler.");
+  }
+
+  @Override
+  public void processRow(Object key, Iterator values) throws IOException {
+    if (reducer.getDone()) {
+      return;
+    }
+
+    try {
+      BytesWritable keyWritable = (BytesWritable) key;
+      byte tag = 0;
+      if (isTagged) {
+        // remove the tag from key coming out of reducer
+        // and store it in separate variable.
+        int size = keyWritable.getSize() - 1;
+        tag = keyWritable.get()[size];
+        keyWritable.setSize(size);
+      }
+
+      if (!keyWritable.equals(groupKey)) {
+        // If a operator wants to do some work at the beginning of a group
+        if (groupKey == null) { // the first group
+          groupKey = new BytesWritable();
+        } else {
+          // If a operator wants to do some work at the end of a group
+          LOG.trace("End Group");
+          reducer.endGroup();
+        }
+
+        try {
+          keyObject = inputKeyDeserializer.deserialize(keyWritable);
+        } catch (Exception e) {
+          throw new HiveException(
+            "Hive Runtime Error: Unable to deserialize reduce input key from "
+              + Utilities.formatBinaryString(keyWritable.get(), 0,
+              keyWritable.getSize()) + " with properties "
+              + keyTableDesc.getProperties(), e);
+        }
+
+        groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+        LOG.trace("Start Group");
+        reducer.setGroupKeyObject(keyObject);
+        reducer.startGroup();
+      }
+      // System.err.print(keyObject.toString());
+      while (values.hasNext()) {
+        BytesWritable valueWritable = (BytesWritable) values.next();
+        // System.err.print(who.getHo().toString());
+        try {
+          valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
+        } catch (SerDeException e) {
+          throw new HiveException(
+            "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+              + tag
+              + ") from "
+              + Utilities.formatBinaryString(valueWritable.get(), 0,
+              valueWritable.getSize()) + " with properties "
+              + valueTableDesc[tag].getProperties(), e);
+        }
+        row.clear();
+        row.add(keyObject);
+        row.add(valueObject[tag]);
+        if (isLogInfoEnabled) {
+            logMemoryInfo();
+        }
+        try {
+          reducer.processOp(row, tag);
+        } catch (Exception e) {
+          String rowString = null;
+          try {
+            rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+          } catch (Exception e2) {
+            rowString = "[Error getting row data with exception " +
+              StringUtils.stringifyException(e2) + " ]";
+          }
+          throw new HiveException("Hive Runtime Error while processing row (tag="
+            + tag + ") " + rowString, e);
+        }
+      }
+
+    } catch (Throwable e) {
+      abort = true;
+      if (e instanceof OutOfMemoryError) {
+        // Don't create a new object if we are already out of memory
+        throw (OutOfMemoryError) e;
+      } else {
+        LOG.fatal(StringUtils.stringifyException(e));
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public void close() {
+
+    // No row was processed
+    if (oc == null) {
+      LOG.trace("Close called without any rows processed");
+    }
+
+    try {
+      if (groupKey != null) {
+        // If a operator wants to do some work at the end of a group
+        LOG.trace("End Group");
+        reducer.endGroup();
+      }
+      if (isLogInfoEnabled) {
+        logCloseInfo();
+      }
+
+      reducer.close(abort);
+      ReportStats rps = new ReportStats(rp);
+      reducer.preorderMap(rps);
+
+    } catch (Exception e) {
+      if (!abort) {
+        // signal new failure to map-reduce
+        LOG.error("Hit error while closing operators - failing tree");
+        throw new RuntimeException("Hive Runtime Error while closing operators: "
+          + e.getMessage(), e);
+      }
+    } finally {
+      MapredContext.close();
+      Utilities.clearWorkMap();
+    }
+  }
+
+  public Operator<?> getReducer() {
+    return reducer;
+  }
+}