You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vi...@apache.org on 2014/08/09 01:23:56 UTC
svn commit: r1616904 - in
/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez:
CustomPartitionEdge.java DagUtils.java HiveSplitGenerator.java
TezSessionState.java
Author: vikram
Date: Fri Aug 8 23:23:55 2014
New Revision: 1616904
URL: http://svn.apache.org/r1616904
Log:
Bring tez-branch up-to the API changes made by TEZ-1372 (Gopal V via Vikram Dixit)
Modified:
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java?rev=1616904&r1=1616903&r2=1616904&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/CustomPartitionEdge.java Fri Aug 8 23:23:55 2014
@@ -43,6 +43,7 @@ public class CustomPartitionEdge extends
// used by the framework at runtime. initialize is the real initializer at runtime
public CustomPartitionEdge(EdgeManagerContext context) {
super(context);
+ this.context = context;
}
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1616904&r1=1616903&r2=1616904&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Aug 8 23:23:55 2014
@@ -79,7 +79,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
-import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.PreWarmVertex;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.DataSinkDescriptor;
import org.apache.tez.dag.api.DataSourceDescriptor;
@@ -380,6 +380,15 @@ public class DagUtils {
}
/*
+ * Helper to setup default environment for a task in YARN.
+ */
+ private Map<String, String> getContainerEnvironment(Configuration conf, boolean isMap) {
+ Map<String, String> environment = new HashMap<String, String>();
+ MRHelpers.updateEnvironmentForMRTasks(conf, environment, isMap);
+ return environment;
+ }
+
+ /*
* Helper to determine what java options to use for the containers
* Falls back to Map-reduces map java opts if no tez specific options
* are set
@@ -449,7 +458,7 @@ public class DagUtils {
// is HiveInputFormat
if (inputFormatClass == HiveInputFormat.class) {
useTezGroupedSplits = true;
- conf.setClass("mapred.input.format.class", TezGroupedSplitsInputFormat.class, InputFormat.class);
+ conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
}
}
@@ -476,9 +485,7 @@ public class DagUtils {
map = new Vertex(mapWork.getName(),
new ProcessorDescriptor(MapTezProcessor.class.getName()).
setUserPayload(serializedConf), numTasks, getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- map.setTaskEnvironment(environment);
+ map.setTaskEnvironment(getContainerEnvironment(conf, true));
map.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
assert mapWork.getAliasToWork().keySet().size() == 1;
@@ -487,10 +494,9 @@ public class DagUtils {
byte[] mrInput = null;
if (useTezGroupedSplits) {
- mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf,
- HiveInputFormat.class.getName());
+ mrInput = MRHelpers.createMRInputPayloadWithGrouping(serializedConf);
} else {
- mrInput = MRHelpers.createMRInputPayload(serializedConf, null);
+ mrInput = MRHelpers.createMRInputPayload(serializedConf);
}
map.addDataSource(alias,
new DataSourceDescriptor(new InputDescriptor(MRInputLegacy.class.getName()).
@@ -550,11 +556,7 @@ public class DagUtils {
reduceWork.isAutoReduceParallelism() ? reduceWork.getMaxReduceTasks() : reduceWork
.getNumReduceTasks(), getContainerResource(conf));
- Map<String, String> environment = new HashMap<String, String>();
-
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, false);
- reducer.setTaskEnvironment(environment);
-
+ reducer.setTaskEnvironment(getContainerEnvironment(conf, false));
reducer.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
Map<String, LocalResource> localResources = new HashMap<String, LocalResource>();
@@ -598,17 +600,16 @@ public class DagUtils {
/**
* @param numContainers number of containers to pre-warm
* @param localResources additional resources to pre-warm with
- * @return prewarm context object
+ * @return prewarm vertex to run
*/
- public PreWarmContext createPreWarmContext(TezConfiguration conf,
+ public PreWarmVertex createPreWarmVertex(TezConfiguration conf,
int numContainers, Map<String, LocalResource> localResources) throws
IOException, TezException {
ProcessorDescriptor prewarmProcDescriptor = new ProcessorDescriptor(HivePreWarmProcessor.class.getName());
prewarmProcDescriptor.setUserPayload(MRHelpers.createUserPayloadFromConf(conf));
- PreWarmContext context = new PreWarmContext(prewarmProcDescriptor, getContainerResource(conf),
- numContainers, null);
+ PreWarmVertex prewarmVertex = new PreWarmVertex("prewarm", prewarmProcDescriptor, numContainers,getContainerResource(conf));
Map<String, LocalResource> combinedResources = new HashMap<String, LocalResource>();
@@ -616,14 +617,10 @@ public class DagUtils {
combinedResources.putAll(localResources);
}
- context.setLocalResources(combinedResources);
-
- /* boiler plate task env */
- Map<String, String> environment = new HashMap<String, String>();
- MRHelpers.updateEnvironmentForMRTasks(conf, environment, true);
- context.setEnvironment(environment);
- context.setJavaOpts(getContainerJavaOpts(conf));
- return context;
+ prewarmVertex.setTaskLocalFiles(localResources);
+ prewarmVertex.setTaskLaunchCmdOpts(getContainerJavaOpts(conf));
+ prewarmVertex.setTaskEnvironment(getContainerEnvironment(conf, false));
+ return prewarmVertex;
}
/**
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java?rev=1616904&r1=1616903&r2=1616904&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/HiveSplitGenerator.java Fri Aug 8 23:23:55 2014
@@ -90,7 +90,7 @@ public class HiveSplitGenerator extends
ShimLoader.getHadoopShims().getMergedCredentials(jobConf);
InputSplitInfoMem inputSplitInfo = null;
- String realInputFormatName = userPayloadProto.getInputFormatName();
+ String realInputFormatName = conf.get("mapred.input.format.class");
if (realInputFormatName != null && !realInputFormatName.isEmpty()) {
// Need to instantiate the realInputFormat
InputFormat<?, ?> inputFormat =
@@ -123,7 +123,8 @@ public class HiveSplitGenerator extends
inputSplitInfo =
new InputSplitInfoMem(flatSplits, locationHints, flatSplits.length, null, jobConf);
} else {
- inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf);
+ // no need for grouping and the target #of tasks.
+ inputSplitInfo = MRHelpers.generateInputSplitsToMem(jobConf, false, 0);
}
return createEventList(sendSerializedEvents, inputSplitInfo);
Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java?rev=1616904&r1=1616903&r2=1616904&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java Fri Aug 8 23:23:55 2014
@@ -50,11 +50,12 @@ import org.apache.hadoop.hive.ql.exec.Ut
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.PreWarmContext;
import org.apache.tez.client.TezClient;
+import org.apache.tez.client.PreWarmVertex;
import org.apache.tez.dag.api.SessionNotRunning;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
/**
@@ -170,6 +171,15 @@ public class TezSessionState {
// generate basic tez config
TezConfiguration tezConfig = new TezConfiguration(conf);
tezConfig.set(TezConfiguration.TEZ_AM_STAGING_DIR, tezScratchDir.toUri().toString());
+
+ if (HiveConf.getBoolVar(conf, ConfVars.HIVE_PREWARM_ENABLED)) {
+ int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
+ n = Math.max(tezConfig.getInt(
+ TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS,
+ TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS_DEFAULT), n);
+ tezConfig.setInt(TezConfiguration.TEZ_AM_SESSION_MIN_HELD_CONTAINERS, n);
+ }
+
session = new TezClient("HIVE-" + sessionId, tezConfig, true,
commonLocalResources, null);
@@ -182,10 +192,10 @@ public class TezSessionState {
int n = HiveConf.getIntVar(conf, ConfVars.HIVE_PREWARM_NUM_CONTAINERS);
LOG.info("Prewarming " + n + " containers (id: " + sessionId
+ ", scratch dir: " + tezScratchDir + ")");
- PreWarmContext context = utils.createPreWarmContext(tezConfig, n,
+ PreWarmVertex prewarmVertex = utils.createPreWarmVertex(tezConfig, n,
commonLocalResources);
try {
- session.preWarm(context);
+ session.preWarm(prewarmVertex);
} catch (InterruptedException ie) {
if (LOG.isDebugEnabled()) {
LOG.debug("Hive Prewarm threw an exception ", ie);