You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by br...@apache.org on 2014/08/13 19:23:31 UTC
svn commit: r1617772 - in
/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark:
HiveMapFunction.java HiveMapFunctionResultList.java
SparkMapRecordHandler.java
Author: brock
Date: Wed Aug 13 17:23:31 2014
New Revision: 1617772
URL: http://svn.apache.org/r1617772
Log:
HIVE-7643 - ExecMapper static states lead to unpredictable query result. (Chengxiang Li via Brock) [Spark Branch]
Added:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
Modified:
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java?rev=1617772&r1=1617771&r2=1617772&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunction.java Wed Aug 13 17:23:31 2014
@@ -20,10 +20,8 @@ package org.apache.hadoop.hive.ql.exec.s
import java.util.Iterator;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.Reporter;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import scala.Tuple2;
@@ -47,7 +45,7 @@ BytesWritable, BytesWritable> {
jobConf = KryoSerializer.deserializeJobConf(this.buffer);
}
- ExecMapper mapper = new ExecMapper();
+ SparkMapRecordHandler mapper = new SparkMapRecordHandler();
mapper.configure(jobConf);
return new HiveMapFunctionResultList(jobConf, it, mapper);
Modified: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java?rev=1617772&r1=1617771&r2=1617772&view=diff
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java (original)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HiveMapFunctionResultList.java Wed Aug 13 17:23:31 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.hive.ql.exec.spark;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hive.ql.exec.mr.ExecMapper;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.Reporter;
import scala.Tuple2;
@@ -28,33 +27,33 @@ import java.util.Iterator;
public class HiveMapFunctionResultList extends
HiveBaseFunctionResultList<Tuple2<BytesWritable, BytesWritable>> {
- private final ExecMapper mapper;
+ private final SparkMapRecordHandler recordHandler;
/**
* Instantiate result set Iterable for Map function output.
*
* @param inputIterator Input record iterator.
- * @param mapper Initialized {@link org.apache.hadoop.hive.ql.exec.mr.ExecMapper} instance.
+ * @param handler Initialized {@link SparkMapRecordHandler} instance.
*/
public HiveMapFunctionResultList(Configuration conf,
- Iterator<Tuple2<BytesWritable, BytesWritable>> inputIterator, ExecMapper mapper) {
+ Iterator<Tuple2<BytesWritable, BytesWritable>> inputIterator, SparkMapRecordHandler handler) {
super(conf, inputIterator);
- this.mapper = mapper;
+ recordHandler = handler;
}
@Override
protected void processNextRecord(Tuple2<BytesWritable, BytesWritable> inputRecord)
throws IOException {
- mapper.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL);
+ recordHandler.map(inputRecord._1(), inputRecord._2(), this, Reporter.NULL);
}
@Override
protected boolean processingDone() {
- return ExecMapper.getDone();
+ return recordHandler.getDone();
}
@Override
protected void closeRecordProcessor() {
- mapper.close();
+ recordHandler.close();
}
}
Added: hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java
URL: http://svn.apache.org/viewvc/hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java?rev=1617772&view=auto
==============================================================================
--- hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java (added)
+++ hive/branches/spark/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkMapRecordHandler.java Wed Aug 13 17:23:31 2014
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.ql.exec.MapOperator;
+import org.apache.hadoop.hive.ql.exec.MapredContext;
+import org.apache.hadoop.hive.ql.exec.ObjectCache;
+import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.OperatorUtils;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext;
+import org.apache.hadoop.hive.ql.exec.vector.VectorMapOperator;
+import org.apache.hadoop.hive.ql.plan.MapWork;
+import org.apache.hadoop.hive.ql.plan.MapredLocalWork;
+import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MemoryMXBean;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+
+/**
+ * Clone from ExecMapper. SparkMapRecordHandler is the bridge between the spark framework and
+ * the Hive operator pipeline at execution time. It's main responsibilities are:
+ *
+ * - Load and setup the operator pipeline from XML
+ * - Run the pipeline by transforming key value pairs to records and forwarding them to the operators
+ * - Stop execution when the "limit" is reached
+ * - Catch and handle errors during execution of the operators.
+ *
+ */
+public class SparkMapRecordHandler {
+
+ private static final String PLAN_KEY = "__MAP_PLAN__";
+ private MapOperator mo;
+ private OutputCollector oc;
+ private JobConf jc;
+ private boolean abort = false;
+ private Reporter rp;
+ public static final Log l4j = LogFactory.getLog(SparkMapRecordHandler.class);
+ private boolean done;
+
+ // used to log memory usage periodically
+ public static MemoryMXBean memoryMXBean;
+ private long numRows = 0;
+ private long nextCntr = 1;
+ private MapredLocalWork localWork = null;
+ private boolean isLogInfoEnabled = false;
+
+ private final ExecMapperContext execContext = new ExecMapperContext();
+
+ public void configure(JobConf job) {
+ // Allocate the bean at the beginning -
+ memoryMXBean = ManagementFactory.getMemoryMXBean();
+ l4j.info("maximum memory = " + memoryMXBean.getHeapMemoryUsage().getMax());
+
+ isLogInfoEnabled = l4j.isInfoEnabled();
+
+ try {
+ l4j.info("conf classpath = "
+ + Arrays.asList(((URLClassLoader) job.getClassLoader()).getURLs()));
+ l4j.info("thread classpath = "
+ + Arrays.asList(((URLClassLoader) Thread.currentThread()
+ .getContextClassLoader()).getURLs()));
+ } catch (Exception e) {
+ l4j.info("cannot get classpath: " + e.getMessage());
+ }
+
+ setDone(false);
+
+ ObjectCache cache = ObjectCacheFactory.getCache(job);
+
+ try {
+ jc = job;
+ execContext.setJc(jc);
+ // create map and fetch operators
+ MapWork mrwork = (MapWork) cache.retrieve(PLAN_KEY);
+ if (mrwork == null) {
+ mrwork = Utilities.getMapWork(job);
+ cache.cache(PLAN_KEY, mrwork);
+ } else {
+ Utilities.setMapWork(job, mrwork);
+ }
+ if (mrwork.getVectorMode()) {
+ mo = new VectorMapOperator();
+ } else {
+ mo = new MapOperator();
+ }
+ mo.setConf(mrwork);
+ // initialize map operator
+ mo.setChildren(job);
+ l4j.info(mo.dump(0));
+ // initialize map local work
+ localWork = mrwork.getMapLocalWork();
+ execContext.setLocalWork(localWork);
+
+ MapredContext.init(true, new JobConf(jc));
+
+ mo.setExecContext(execContext);
+ mo.initializeLocalWork(jc);
+ mo.initialize(jc, null);
+
+ if (localWork == null) {
+ return;
+ }
+
+ //The following code is for mapjoin
+ //initialize all the dummy ops
+ l4j.info("Initializing dummy operator");
+ List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.setExecContext(execContext);
+ dummyOp.initialize(jc,null);
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // will this be true here?
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ throw new RuntimeException("Map operator initialization failed", e);
+ }
+ }
+ }
+
+ public void map(Object key, Object value, OutputCollector output,
+ Reporter reporter) throws IOException {
+ if (oc == null) {
+ oc = output;
+ rp = reporter;
+ OperatorUtils.setChildrenCollector(mo.getChildOperators(), output);
+ mo.setReporter(rp);
+ MapredContext.get().setReporter(reporter);
+ }
+ // reset the execContext for each new row
+ execContext.resetRow();
+
+ try {
+ if (mo.getDone()) {
+ done = true;
+ } else {
+ // Since there is no concept of a group, we don't invoke
+ // startGroup/endGroup for a mapper
+ mo.process((Writable)value);
+ if (isLogInfoEnabled) {
+ numRows++;
+ if (numRows == nextCntr) {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processing " + numRows
+ + " rows: used memory = " + used_memory);
+ nextCntr = getNextCntr(numRows);
+ }
+ }
+ }
+ } catch (Throwable e) {
+ abort = true;
+ if (e instanceof OutOfMemoryError) {
+ // Don't create a new object if we are already out of memory
+ throw (OutOfMemoryError) e;
+ } else {
+ l4j.fatal(StringUtils.stringifyException(e));
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+ private long getNextCntr(long cntr) {
+ // A very simple counter to keep track of number of rows processed by the
+ // reducer. It dumps
+ // every 1 million times, and quickly before that
+ if (cntr >= 1000000) {
+ return cntr + 1000000;
+ }
+
+ return 10 * cntr;
+ }
+
+ public void close() {
+ // No row was processed
+ if (oc == null) {
+ l4j.trace("Close called. no row processed by map.");
+ }
+
+ // check if there are IOExceptions
+ if (!abort) {
+ abort = execContext.getIoCxt().getIOExceptions();
+ }
+
+ // detecting failed executions by exceptions thrown by the operator tree
+ // ideally hadoop should let us know whether map execution failed or not
+ try {
+ mo.close(abort);
+
+ //for close the local work
+ if(localWork != null){
+ List<Operator<? extends OperatorDesc>> dummyOps = localWork.getDummyParentOp();
+
+ for (Operator<? extends OperatorDesc> dummyOp : dummyOps){
+ dummyOp.close(abort);
+ }
+ }
+
+ if (isLogInfoEnabled) {
+ long used_memory = memoryMXBean.getHeapMemoryUsage().getUsed();
+ l4j.info("ExecMapper: processed " + numRows + " rows: used memory = "
+ + used_memory);
+ }
+
+ ReportStats rps = new ReportStats(rp);
+ mo.preorderMap(rps);
+ return;
+ } catch (Exception e) {
+ if (!abort) {
+ // signal new failure to map-reduce
+ l4j.error("Hit error while closing operators - failing tree");
+ throw new RuntimeException("Hive Runtime Error while closing operators", e);
+ }
+ } finally {
+ MapredContext.close();
+ Utilities.clearWorkMap();
+ }
+ }
+
+ public boolean getDone() {
+ return done;
+ }
+
+ public boolean isAbort() {
+ return abort;
+ }
+
+ public void setAbort(boolean abort) {
+ this.abort = abort;
+ }
+
+ public void setDone(boolean done) {
+ this.done = done;
+ }
+
+ /**
+ * reportStats.
+ *
+ */
+ public static class ReportStats implements Operator.OperatorFunc {
+ private final Reporter rp;
+
+ public ReportStats(Reporter rp) {
+ this.rp = rp;
+ }
+
+ public void func(Operator op) {
+ Map<Enum<?>, Long> opStats = op.getStats();
+ for (Map.Entry<Enum<?>, Long> e : opStats.entrySet()) {
+ if (rp != null) {
+ rp.incrCounter(e.getKey(), e.getValue());
+ }
+ }
+ }
+ }
+}