You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by se...@apache.org on 2014/04/26 00:11:36 UTC
svn commit: r1590164 - in /hive/trunk/ql/src/java/org/apache/hadoop/hive/ql:
exec/tez/DagUtils.java exec/tez/TezSessionPoolManager.java
exec/tez/TezTask.java session/SessionState.java
Author: sershe
Date: Fri Apr 25 22:11:35 2014
New Revision: 1590164
URL: http://svn.apache.org/r1590164
Log:
HIVE-6898 : Functions in hive are failing with java.lang.ClassNotFoundException on Tez (Vikram Dixit K, reviewed by Sergey Shelukhin)
Modified:
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Fri Apr 25 22:11:35 2014
@@ -17,10 +17,6 @@
*/
package org.apache.hadoop.hive.ql.exec.tez;
-import com.google.common.base.Function;
-import com.google.common.collect.Iterators;
-import com.google.common.collect.Lists;
-
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
@@ -32,7 +28,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import javax.security.auth.login.LoginException;
@@ -45,7 +40,6 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
@@ -85,6 +79,8 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.URL;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.tez.client.PreWarmContext;
+import org.apache.tez.client.TezSessionConfiguration;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.EdgeManagerDescriptor;
@@ -96,13 +92,11 @@ import org.apache.tez.dag.api.GroupInput
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.OutputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.Vertex;
-import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
-import org.apache.tez.dag.api.VertexLocationHint;
import org.apache.tez.dag.api.TezException;
-import org.apache.tez.client.PreWarmContext;
-import org.apache.tez.client.TezSessionConfiguration;
+import org.apache.tez.dag.api.Vertex;
import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.VertexLocationHint;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
import org.apache.tez.mapreduce.common.MRInputAMSplitGenerator;
import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
import org.apache.tez.mapreduce.hadoop.MRHelpers;
@@ -118,6 +112,10 @@ import org.apache.tez.runtime.library.ou
import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
+import com.google.common.base.Function;
+import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
/**
* DagUtils. DagUtils is a collection of helper methods to convert
* map and reduce work to tez vertices and edges. It handles configuration
@@ -254,7 +252,7 @@ public class DagUtils {
/**
* Given two vertices a, b update their configurations to be used in an Edge a-b
*/
- public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
+ public void updateConfigurationForEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w)
throws IOException {
// Tez needs to setup output subsequent input pairs correctly
@@ -311,13 +309,13 @@ public class DagUtils {
break;
case CUSTOM_EDGE:
-
+
dataMovementType = DataMovementType.CUSTOM;
logicalOutputClass = OnFileUnorderedPartitionedKVOutput.class;
logicalInputClass = ShuffledUnorderedKVInput.class;
EdgeManagerDescriptor edgeDesc = new EdgeManagerDescriptor(
CustomPartitionEdge.class.getName());
- CustomEdgeConfiguration edgeConf =
+ CustomEdgeConfiguration edgeConf =
new CustomEdgeConfiguration(edgeProp.getNumBuckets(), null);
DataOutputBuffer dob = new DataOutputBuffer();
edgeConf.write(dob);
@@ -432,7 +430,7 @@ public class DagUtils {
}
if (vertexHasCustomInput) {
useTezGroupedSplits = false;
- // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
+ // grouping happens in execution phase. Setting the class to TezGroupedSplitsInputFormat
// here would cause pre-mature grouping which would be incorrect.
inputFormatClass = HiveInputFormat.class;
conf.setClass("mapred.input.format.class", HiveInputFormat.class, InputFormat.class);
@@ -661,6 +659,11 @@ public class DagUtils {
String hdfsDirPathStr, Configuration conf) throws IOException, LoginException {
List<LocalResource> tmpResources = new ArrayList<LocalResource>();
+ addTempFiles(conf, tmpResources, hdfsDirPathStr, getTempFilesFromConf(conf));
+ return tmpResources;
+ }
+
+ public static String[] getTempFilesFromConf(Configuration conf) {
String addedFiles = Utilities.getResourceFiles(conf, SessionState.ResourceType.FILE);
if (StringUtils.isNotBlank(addedFiles)) {
HiveConf.setVar(conf, ConfVars.HIVEADDEDFILES, addedFiles);
@@ -679,8 +682,7 @@ public class DagUtils {
// need to localize the additional jars and files
// we need the directory on hdfs to which we shall put all these files
String allFiles = auxJars + "," + addedJars + "," + addedFiles + "," + addedArchives;
- addTempFiles(conf, tmpResources, hdfsDirPathStr, allFiles.split(","));
- return tmpResources;
+ return allFiles.split(",");
}
/**
@@ -900,7 +902,7 @@ public class DagUtils {
* @param work The instance of BaseWork representing the actual work to be performed
* by this vertex.
* @param scratchDir HDFS scratch dir for this execution unit.
- * @param list
+ * @param list
* @param appJarLr Local resource for hive-exec.
* @param additionalLr
* @param fileSystem FS corresponding to scratchDir and LocalResources
@@ -908,7 +910,7 @@ public class DagUtils {
* @return Vertex
*/
public Vertex createVertex(JobConf conf, BaseWork work,
- Path scratchDir, LocalResource appJarLr,
+ Path scratchDir, LocalResource appJarLr,
List<LocalResource> additionalLr,
FileSystem fileSystem, Context ctx, boolean hasChildren, TezWork tezWork) throws Exception {
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Fri Apr 25 22:11:35 2014
@@ -21,8 +21,6 @@ package org.apache.hadoop.hive.ql.exec.t
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -106,7 +104,8 @@ public class TezSessionPoolManager {
}
}
- private TezSessionState getSession(HiveConf conf, boolean doOpen)
+ private TezSessionState getSession(HiveConf conf, boolean doOpen,
+ boolean forceCreate)
throws Exception {
String queueName = conf.get("tez.queue.name");
@@ -119,8 +118,9 @@ public class TezSessionPoolManager {
* their own credentials. We expect that with the new security model, things will
* run as user hive in most cases.
*/
- if (!(this.inited) || ((queueName != null) && (!queueName.isEmpty()))
- || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
+ if (forceCreate || !(this.inited)
+ || ((queueName != null) && (!queueName.isEmpty()))
+ || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) {
LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser +
" defaultQueuePool: " + defaultQueuePool +
" blockingQueueLength: " + blockingQueueLength);
@@ -156,7 +156,7 @@ public class TezSessionPoolManager {
public void returnSession(TezSessionState tezSessionState)
throws Exception {
if (tezSessionState.isDefault()) {
- LOG.info("The session " + tezSessionState.getSessionId()
+ LOG.info("The session " + tezSessionState.getSessionId()
+ " belongs to the pool. Put it back in");
SessionState sessionState = SessionState.get();
if (sessionState != null) {
@@ -194,15 +194,7 @@ public class TezSessionPoolManager {
public TezSessionState getSession(
TezSessionState session, HiveConf conf, boolean doOpen) throws Exception {
- if (canWorkWithSameSession(session, conf)) {
- return session;
- }
-
- if (session != null) {
- session.close(false);
- }
-
- return getSession(conf, doOpen);
+ return getSession(session, conf, doOpen, false);
}
/*
@@ -262,4 +254,17 @@ public class TezSessionPoolManager {
return true;
}
+
+ public TezSessionState getSession(TezSessionState session, HiveConf conf,
+ boolean doOpen, boolean forceCreate) throws Exception {
+ if (canWorkWithSameSession(session, conf)) {
+ return session;
+ }
+
+ if (session != null) {
+ session.close(false);
+ }
+
+ return getSession(conf, doOpen, forceCreate);
+ }
}
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Fri Apr 25 22:11:35 2014
@@ -23,7 +23,6 @@ import java.net.URISyntaxException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -34,7 +33,6 @@ import javax.security.auth.login.LoginEx
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.ql.Context;
import org.apache.hadoop.hive.ql.DriverContext;
import org.apache.hadoop.hive.ql.exec.Operator;
@@ -52,7 +50,6 @@ import org.apache.hadoop.hive.ql.session
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.tez.client.TezSession;
import org.apache.tez.common.counters.CounterGroup;
import org.apache.tez.common.counters.TezCounter;
import org.apache.tez.common.counters.TezCounters;
@@ -80,7 +77,7 @@ public class TezTask extends Task<TezWor
private TezCounters counters;
- private DagUtils utils;
+ private final DagUtils utils;
public TezTask() {
this(DagUtils.getInstance());
@@ -129,12 +126,22 @@ public class TezTask extends Task<TezWor
// create the tez tmp dir
scratchDir = utils.createTezDir(scratchDir, conf);
+ boolean hasResources = session.hasResources(inputOutputJars);
+
+ if (ss.hasAddedResource()) {
+ // need to re-launch session because of new added jars.
+ hasResources = false;
+ // reset the added resource flag for this session since we would
+ // relocalize (either by restarting or relocalizing) due to the above
+ // hasResources flag.
+ ss.setAddedResource(false);
+ }
// If we have any jars from input format, we need to restart the session because
// AM will need them; so, AM has to be restarted. What a mess...
- if (!session.hasResources(inputOutputJars) && session.isOpen()) {
+ if (!hasResources && session.isOpen()) {
LOG.info("Tez session being reopened to pass custom jars to AM");
- session.close(false);
+ TezSessionPoolManager.getInstance().close(session);
session = TezSessionPoolManager.getInstance().getSession(null, conf, false);
ss.setTezSession(session);
}
@@ -246,15 +253,15 @@ public class TezTask extends Task<TezWor
vertexArray[i++] = workToVertex.get(v);
}
VertexGroup group = dag.createVertexGroup(w.getName(), vertexArray);
-
+
// now hook up the children
for (BaseWork v: children) {
// need to pairwise patch up the configuration of the vertices
for (BaseWork part: unionWorkItems) {
- utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
+ utils.updateConfigurationForEdge(workToConf.get(part), workToVertex.get(part),
workToConf.get(v), workToVertex.get(v));
}
-
+
// finally we can create the grouped edge
GroupInputEdge e = utils.createEdge(group, workToConf.get(v),
workToVertex.get(v), work.getEdgeProperty(w, v));
@@ -264,14 +271,14 @@ public class TezTask extends Task<TezWor
} else {
// Regular vertices
JobConf wxConf = utils.initializeVertexConf(conf, w);
- Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
+ Vertex wx = utils.createVertex(wxConf, w, scratchDir, appJarLr,
additionalLr, fs, ctx, !isFinal, work);
dag.addVertex(wx);
utils.addCredentials(w, dag);
perfLogger.PerfLogEnd(CLASS_NAME, PerfLogger.TEZ_CREATE_VERTEX + w.getName());
workToVertex.put(w, wx);
workToConf.put(w, wxConf);
-
+
// add all dependencies (i.e.: edges) to the graph
for (BaseWork v: work.getChildren(w)) {
assert workToVertex.containsKey(v);
Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java?rev=1590164&r1=1590163&r2=1590164&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java Fri Apr 25 22:11:35 2014
@@ -163,6 +163,8 @@ public class SessionState {
private final String CONFIG_AUTHZ_SETTINGS_APPLIED_MARKER =
"hive.internal.ss.authz.settings.applied.marker";
+ private boolean addedResource;
+
/**
* Lineage state.
*/
@@ -736,6 +738,7 @@ public class SessionState {
getConsole().printInfo("Added resource: " + fnlVal);
resourceMap.add(fnlVal);
+ addedResource = true;
return fnlVal;
}
@@ -1033,4 +1036,11 @@ public class SessionState {
}
+ public boolean hasAddedResource() {
+ return addedResource;
+ }
+
+ public void setAddedResource(boolean addedResouce) {
+ this.addedResource = addedResouce;
+ }
}