You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/15 06:28:01 UTC
svn commit: r1618094 -
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/
Author: brock
Date: Fri Aug 15 04:28:01 2014
New Revision: 1618094
URL: http://svn.apache.org/r1618094
Log:
HIVE-7677 - Implement native HiveReduceFunction (Chengxiang Li via Brock) [Spark Branch]
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Fri Aug 15 04:28:01 2014
@@ -44,7 +44,7 @@ public class HiveMapFunctionResultList e
@Override
protected void processNextRecord(Tuple2<BytesWritable, BytesWritable> inputRecord)
throws IOException {
- recordHandler.process(inputRecord._2());
+ recordHandler.processRow(inputRecord._2());
}
@Override
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunction.java Fri Aug 15 04:28:01 2014
@@ -48,8 +48,10 @@ BytesWritable, BytesWritable> {
jobConf.set("mapred.reducer.class", ExecReducer.class.getName());
}
- ExecReducer reducer = new ExecReducer();
- reducer.configure(jobConf);
- return new HiveReduceFunctionResultList(jobConf, it, reducer);
+ SparkReduceRecordHandler reducerRecordhandler = new SparkReduceRecordHandler();
+ HiveReduceFunctionResultList result = new HiveReduceFunctionResultList(jobConf, it, reducerRecordhandler);
+ reducerRecordhandler.init(jobConf, result, Reporter.NULL);
+
+ return result;
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveReduceFunctionResultList.java Fri Aug 15 04:28:01 2014
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.exec.s
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
-import org.apache.hadoop.hive.ql.exec.mr.ExecReducer;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Reporter;
@@ -32,7 +31,7 @@ import java.util.Iterator;
public class HiveReduceFunctionResultList extends
HiveBaseFunctionResultList<Tuple2<BytesWritable, Iterable<BytesWritable>>> {
- private final ExecReducer reducer;
+ private final SparkReduceRecordHandler reduceRecordHandler;
/**
* Instantiate result set Iterable for Reduce function output.
@@ -42,16 +41,16 @@ public class HiveReduceFunctionResultLis
*/
public HiveReduceFunctionResultList(Configuration conf,
Iterator<Tuple2<BytesWritable, Iterable<BytesWritable>>> inputIterator,
- ExecReducer reducer) {
+ SparkReduceRecordHandler reducer) {
super(conf, inputIterator);
- this.reducer = reducer;
+ this.reduceRecordHandler = reducer;
setOutputCollector();
}
@Override
protected void processNextRecord(Tuple2<BytesWritable, Iterable<BytesWritable>> inputRecord)
throws IOException {
- reducer.reduce(inputRecord._1(), inputRecord._2().iterator(), this, Reporter.NULL);
+ reduceRecordHandler.processRow(inputRecord._1(), inputRecord._2().iterator());
}
@Override
@@ -61,13 +60,13 @@ public class HiveReduceFunctionResultLis
@Override
protected void closeRecordProcessor() {
- reducer.close();
+ reduceRecordHandler.close();
}
private void setOutputCollector() {
- if (reducer != null && reducer.getReducer() != null) {
+ if (reduceRecordHandler != null && reduceRecordHandler.getReducer() != null) {
OperatorUtils.setChildrenCollector(
- Arrays.<Operator<? extends OperatorDesc>>asList(reducer.getReducer()), this);
+ Arrays.<Operator<? extends OperatorDesc>>asList(reduceRecordHandler.getReducer()), this);
}
}
}
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1618094&r1=1618093&r2=1618094&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -27,6 +27,7 @@ import org.apache.hadoop.hive.ql.exec.Ob
import org.apache.hadoop.hive.ql.exec.Operator;
import org.apache.hadoop.hive.ql.exec.OperatorUtils;
import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
import org.apache.hadoop.hive.ql.plan.MapWork;
@@ -39,12 +40,8 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.StringUtils;
import java.io.IOException;
-import java.lang.management.ManagementFactory;
-import java.lang.management.MemoryMXBean;
-import java.net.URLClassLoader;
-import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
-import java.util.Map;
/**
@@ -57,45 +54,22 @@ import java.util.Map;
* - Catch and handle errors during execution of the operators.
*
*/
-public class SparkMapRecordHandler {
+public class SparkMapRecordHandler extends SparkRecordHandler{
private static final String PLAN_KEY = "__MAP_PLAN__";
private MapOperator mo;
- private OutputCollector oc;
- private JobConf jc;
- private boolean abort = false;
- private Reporter rp;
public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
private boolean done;
- // used to log memory usage periodically
- public static MemoryMXBean memoryMXBean;
- private long numRows = 0;
- private long nextCntr = 1;
private MapredLocalWork localWork = null;
private boolean isLogInfoEnabled = false;
private final ExecMapperContext execContext = new ExecMapperContext();
public void init(JobConf job, OutputCollector output, Reporter reporter) {
- // Allocate the bean at the beginning -
- memoryMXBean = ManagementFactory.getMemoryMXBean();
- l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+ super.init(job, output, reporter);
isLogInfoEnabled = l4j.isInfoEnabled();
-
- try {
- l4j.info("conf classpath = "
- + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
- l4j.info("thread classpath = "
- + Arrays.asList(((URLClassLoader) Thread.currentThread()
- .getContextClassLoader()).getURLs()));
- } catch (Exception e) {
- l4j.info("cannot get classpath: " + e.getMessage());
- }
-
- setDone(false);
-
ObjectCache cache = ObjectCacheFactory.getCache(job);
try {
@@ -128,11 +102,8 @@ public class SparkMapRecordHandler {
mo.initializeLocalWork(jc);
mo.initialize(jc, null);
- oc = output;
- rp = reporter;
OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
mo.setReporter(rp);
- MapredContext.get().setReporter(reporter);
if (localWork == null) {
return;
@@ -158,26 +129,17 @@ public class SparkMapRecordHandler {
}
}
- public void process(Object value) throws IOException {
+ @Override
+ public void processRow(Object value) throws IOException {
// reset the execContext for each new row
execContext.resetRow();
try {
- if (mo.getDone()) {
- done = true;
- } else {
- // Since there is no concept of a group, we don't invoke
- // startGroup/endGroup for a mapper
- mo.process((Writable)value);
- if (isLogInfoEnabled) {
- numRows++;
- if (numRows == nextCntr) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processing " + numRows
- + " rows: used memory = " + used_memory);
- nextCntr = getNextCntr(numRows);
- }
- }
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mo.process((Writable) value);
+ if (isLogInfoEnabled) {
+ logMemoryInfo();
}
} catch (Throwable e) {
abort = true;
@@ -191,16 +153,9 @@ public class SparkMapRecordHandler {
}
}
-
- private long getNextCntr(long cntr) {
- // A very simple counter to keep track of number of rows processed by the
- // reducer. It dumps
- // every 1 million times, and quickly before that
- if (cntr >= 1000000) {
- return cntr + 1000000;
- }
-
- return 10 * cntr;
+ @Override
+ public void processRow(Object key, Iterator values) throws IOException {
+ throw new UnsupportedOperationException("Do not support this method in SparkMapRecordHandler.");
}
public void close() {
@@ -229,9 +184,7 @@ public class SparkMapRecordHandler {
}
if (isLogInfoEnabled) {
- long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
- l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
- + used_memory);
+ logCloseInfo();
}
ReportStats rps = new ReportStats(rp);
@@ -250,39 +203,6 @@ public class SparkMapRecordHandler {
}
public boolean getDone() {
- return done;
- }
-
- public boolean isAbort() {
- return abort;
- }
-
- public void setAbort(boolean abort) {
- this.abort = abort;
- }
-
- public void setDone(boolean done) {
- this.done = done;
- }
-
- /**
- * reportStats.
- *
- */
- public static class ReportStats implements Operator.OperatorFunc {
- private final Reporter rp;
-
- public ReportStats(Reporter rp) {
- this.rp = rp;
- }
-
- public void func(Operator op) {
- Map<Enum<?>, Long> opStats = op.getStats();
- for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
- if (rp != null) {
- rp.incrCounter(e.getKey(), e.getValue());
- }
- }
- }
+ return mo.getDone();
}
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java?rev=1618094&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.Iterator;
+
+public abstract class SparkRecordHandler {
+ private static final Log LOG = LogFactory.getLog(SparkRecordHandler.class);
+
+ // used to log memory usage periodically
+ protected final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
+
+ protected JobConf jc;
+ protected OutputCollector<?, ?> oc;
+ protected Reporter rp;
+ protected boolean abort = false;
+ private long rowNumber = 0;
+ private long nextLogThreshold = 1;
+
+ public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ jc = job;
+ MapredContext.init(false, new JobConf(jc));
+
+ oc = output;
+ rp = reporter;
+ MapredContext.get().setReporter(reporter);
+
+ LOG.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+
+ try {
+ LOG.info("conf classpath = "
+ + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+ LOG.info("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
+ } catch (Exception e) {
+ LOG.info("cannot get classpath: " + e.getMessage());
+ }
+ }
+
+ /**
+ * Process row with single value.
+ */
+ public abstract void processRow(Object value) throws IOException;
+
+ /**
+ * Process row with key and value collection.
+ */
+ public abstract void processRow(Object key, Iterator values) throws IOException;
+
+ /**
+ * Log processed row number and used memory info.
+ */
+ protected void logMemoryInfo() {
+ rowNumber++;
+ if (rowNumber == nextLogThreshold) {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ LOG.info("ExecReducer: processing " + rowNumber
+ + " rows: used memory = " + used_memory);
+ nextLogThreshold = getNextLogThreshold(rowNumber);
+ }
+ }
+
+ abstract void close();
+
+ /**
+ * Log information to be logged at the end
+ */
+ protected void logCloseInfo() {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ LOG.info("ExecMapper: processed " + rowNumber + " rows: used memory = "
+ + used_memory);
+ }
+
+ private long getNextLogThreshold(long currentThreshold) {
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
+ // every 1 million times, and quickly before that
+ if (currentThreshold >= 1000000) {
+ return currentThreshold + 1000000;
+ }
+
+ return 10 * currentThreshold;
+ }
+
+ public boolean isAbort() {
+ return abort;
+ }
+
+ public void setAbort(boolean abort) {
+ this.abort = abort;
+ }
+}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java?rev=1618094&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java Fri Aug 15 04:28:01 2014
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapper.ReportStats;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.plan.ReduceWork;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.Deserializer;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.MapReduceBase;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reducer;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Clone from ExecReducer, it is the bridge between the spark framework and
+ * the Hive operator pipeline at execution time. It's main responsibilities are:
+ *
+ * - Load and setup the operator pipeline from XML
+ * - Run the pipeline by transforming key, value pairs to records and forwarding them to the operators
+ * - Sending start and end group messages to separate records with same key from one another
+ * - Catch and handle errors during execution of the operators.
+ *
+ */
+public class SparkReduceRecordHandler extends SparkRecordHandler{
+
+ private static final Log LOG = LogFactory.getLog(SparkReduceRecordHandler.class);
+ private static final String PLAN_KEY = "__REDUCE_PLAN__";
+
+ // Input value serde needs to be an array to support different SerDe
+ // for different tags
+ private final Deserializer[] inputValueDeserializer = new Deserializer[Byte.MAX_VALUE];
+ private final Object[] valueObject = new Object[Byte.MAX_VALUE];
+ private final List<Object> row = new ArrayList<Object>(Utilities.reduceFieldNameList.size());
+ private final boolean isLogInfoEnabled = LOG.isInfoEnabled();
+
+ // TODO: move to DynamicSerDe when it's ready
+ private Deserializer inputKeyDeserializer;
+ private Operator<?> reducer;
+ private boolean isTagged = false;
+ private TableDesc keyTableDesc;
+ private TableDesc[] valueTableDesc;
+ private ObjectInspector[] rowObjectInspector;
+
+ // runtime objects
+ private transient Object keyObject;
+ private transient BytesWritable groupKey;
+
+ public void init(JobConf job, OutputCollector output, Reporter reporter) {
+ super.init(job, output, reporter);
+
+ rowObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+ ObjectInspector[] valueObjectInspector = new ObjectInspector[Byte.MAX_VALUE];
+ ObjectInspector keyObjectInspector;
+
+ ObjectCache cache = ObjectCacheFactory.getCache(jc);
+ ReduceWork gWork = (ReduceWork) cache.retrieve(PLAN_KEY);
+ if (gWork == null) {
+ gWork = Utilities.getReduceWork(job);
+ cache.cache(PLAN_KEY, gWork);
+ } else {
+ Utilities.setReduceWork(job, gWork);
+ }
+
+ reducer = gWork.getReducer();
+ reducer.setParentOperators(null); // clear out any parents as reducer is the
+ // root
+ isTagged = gWork.getNeedsTagging();
+ try {
+ keyTableDesc = gWork.getKeyDesc();
+ inputKeyDeserializer = ReflectionUtils.newInstance(keyTableDesc
+ .getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(inputKeyDeserializer, null, keyTableDesc.getProperties(), null);
+ keyObjectInspector = inputKeyDeserializer.getObjectInspector();
+ valueTableDesc = new TableDesc[gWork.getTagToValueDesc().size()];
+ for (int tag = 0; tag < gWork.getTagToValueDesc().size(); tag++) {
+ // We should initialize the SerDe with the TypeInfo when available.
+ valueTableDesc[tag] = gWork.getTagToValueDesc().get(tag);
+ inputValueDeserializer[tag] = ReflectionUtils.newInstance(
+ valueTableDesc[tag].getDeserializerClass(), null);
+ SerDeUtils.initializeSerDe(inputValueDeserializer[tag], null,
+ valueTableDesc[tag].getProperties(), null);
+ valueObjectInspector[tag] = inputValueDeserializer[tag]
+ .getObjectInspector();
+
+ ArrayList<ObjectInspector> ois = new ArrayList<ObjectInspector>();
+ ois.add(keyObjectInspector);
+ ois.add(valueObjectInspector[tag]);
+ reducer.setGroupKeyObjectInspector(keyObjectInspector);
+ rowObjectInspector[tag] = ObjectInspectorFactory
+ .getStandardStructObjectInspector(Utilities.reduceFieldNameList, ois);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ reducer.setReporter(rp);
+
+ // initialize reduce operator tree
+ try {
+ LOG.info(reducer.dump(0));
+ reducer.initialize(jc, rowObjectInspector);
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Reduce operator initialization failed", e);
+ }
+ }
+ }
+
+ @Override
+ public void processRow(Object value) throws IOException {
+ throw new UnsupportedOperationException("Do not support this method in SparkReduceRecordHandler.");
+ }
+
+ @Override
+ public void processRow(Object key, Iterator values) throws IOException {
+ if (reducer.getDone()) {
+ return;
+ }
+
+ try {
+ BytesWritable keyWritable = (BytesWritable) key;
+ byte tag = 0;
+ if (isTagged) {
+ // remove the tag from key coming out of reducer
+ // and store it in separate variable.
+ int size = keyWritable.getSize() - 1;
+ tag = keyWritable.get()[size];
+ keyWritable.setSize(size);
+ }
+
+ if (!keyWritable.equals(groupKey)) {
+ // If a operator wants to do some work at the beginning of a group
+ if (groupKey == null) { // the first group
+ groupKey = new BytesWritable();
+ } else {
+ // If a operator wants to do some work at the end of a group
+ LOG.trace("End Group");
+ reducer.endGroup();
+ }
+
+ try {
+ keyObject = inputKeyDeserializer.deserialize(keyWritable);
+ } catch (Exception e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input key from "
+ + Utilities.formatBinaryString(keyWritable.get(), 0,
+ keyWritable.getSize()) + " with properties "
+ + keyTableDesc.getProperties(), e);
+ }
+
+ groupKey.set(keyWritable.get(), 0, keyWritable.getSize());
+ LOG.trace("Start Group");
+ reducer.setGroupKeyObject(keyObject);
+ reducer.startGroup();
+ }
+ // System.err.print(keyObject.toString());
+ while (values.hasNext()) {
+ BytesWritable valueWritable = (BytesWritable) values.next();
+ // System.err.print(who.getHo().toString());
+ try {
+ valueObject[tag] = inputValueDeserializer[tag].deserialize(valueWritable);
+ } catch (SerDeException e) {
+ throw new HiveException(
+ "Hive Runtime Error: Unable to deserialize reduce input value (tag="
+ + tag
+ + ") from "
+ + Utilities.formatBinaryString(valueWritable.get(), 0,
+ valueWritable.getSize()) + " with properties "
+ + valueTableDesc[tag].getProperties(), e);
+ }
+ row.clear();
+ row.add(keyObject);
+ row.add(valueObject[tag]);
+ if (isLogInfoEnabled) {
+ logMemoryInfo();
+ }
+ try {
+ reducer.processOp(row, tag);
+ } catch (Exception e) {
+ String rowString = null;
+ try {
+ rowString = SerDeUtils.getJSONString(row, rowObjectInspector[tag]);
+ } catch (Exception e2) {
+ rowString = "[Error getting row data with exception " +
+ StringUtils.stringifyException(e2) + " ]";
+ }
+ throw new HiveException("Hive Runtime Error while processing row (tag="
+ + tag + ") " + rowString, e);
+ }
+ }
+
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ LOG.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void close() {
+
+ // No row was processed
+ if (oc == null) {
+ LOG.trace("Close called without any rows processed");
+ }
+
+ try {
+ if (groupKey != null) {
+ // If a operator wants to do some work at the end of a group
+ LOG.trace("End Group");
+ reducer.endGroup();
+ }
+ if (isLogInfoEnabled) {
+ logCloseInfo();
+ }
+
+ reducer.close(abort);
+ ReportStats rps = new ReportStats(rp);
+ reducer.preorderMap(rps);
+
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ LOG.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators: "
+ + e.getMessage(), e);
+ }
+ } finally {
+ MapredContext.close();
+ Utilities.clearWorkMap();
+ }
+ }
+
+ public Operator<?> getReducer() {
+ return reducer;
+ }
+}