You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/07/16 20:36:15 UTC
svn commit: r1362181 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/impl/PigContext.java
test/org/apache/pig/test/TestRegisteredJarVisibility.java
test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java
Author: jcoveney
Date: Mon Jul 16 18:36:14 2012
New Revision: 1362181
URL: http://svn.apache.org/viewvc?rev=1362181&view=rev
Log:
PIG-2815: class loader management in PigContext (rangadi via jcoveney)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/impl/PigContext.java
pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java
pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 16 18:36:14 2012
@@ -194,6 +194,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-2815: class loader management in PigContext (rangadi via jcoveney)
+
PIG-2813: Fix test regressions from PIG-2632 (jcoveney)
PIG-2806: Fix merge join test regression from PIG-2632 (jcoveney)
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Jul 16 18:36:14 2012
@@ -58,85 +58,104 @@ import org.apache.pig.impl.util.JarManag
public class PigContext implements Serializable {
private static final long serialVersionUID = 1L;
-
+
private static final Log log = LogFactory.getLog(PigContext.class);
-
+
public static final String JOB_NAME = "jobName";
public static final String JOB_NAME_PREFIX= "PigLatin";
public static final String JOB_PRIORITY = "jobPriority";
public static final String PIG_CMD_ARGS_REMAINDERS = "pig.cmd.args.remainders";
-
-
- /* NOTE: we only serialize some of the stuff
- *
- *(to make it smaller given that it's not all needed on the Hadoop side,
+
+
+ /* NOTE: we only serialize some of the stuff
+ *
+ *(to make it smaller given that it's not all needed on the Hadoop side,
* and also because some is not serializable e.g. the Configuration)
*/
-
+
//one of: local, mapreduce, pigbody
- private ExecType execType;;
+ private ExecType execType;;
// extra jar files that are needed to run a job
- transient public List<URL> extraJars = new LinkedList<URL>();
-
+ transient public List<URL> extraJars = new LinkedList<URL>();
+
// The jars that should not be merged in. (Some functions may come from pig.jar and we don't want the whole jar file.)
- transient public Vector<String> skipJars = new Vector<String>(2);
-
+ transient public Vector<String> skipJars = new Vector<String>(2);
+
//main file system that jobs and shell commands access
- transient private DataStorage dfs;
-
+ transient private DataStorage dfs;
+
// local file system, where jar files, etc. reside
- transient private DataStorage lfs;
-
+ transient private DataStorage lfs;
+
// handle to the back-end
transient private HExecutionEngine executionEngine;
-
+
private Properties properties;
-
+
// script files that are needed to run a job
@Deprecated
public List<String> scriptFiles = new ArrayList<String>();
private Map<String,File> aliasedScriptFiles = new LinkedHashMap<String,File>();
-
+
// script jars that are needed to run a script - jython.jar etc
public List<String> scriptJars = new ArrayList<String>(2);
-
+
/**
* a table mapping function names to function specs.
*/
private Map<String, FuncSpec> definedFunctions = new HashMap<String, FuncSpec>();
-
+
/**
* a table mapping names to streaming commands.
*/
- private Map<String, StreamingCommand> definedCommands =
+ private Map<String, StreamingCommand> definedCommands =
new HashMap<String, StreamingCommand>();
- private static ThreadLocal<ArrayList<String>> packageImportList =
+ private static ThreadLocal<ArrayList<String>> packageImportList =
new ThreadLocal<ArrayList<String>>();
private Properties log4jProperties = new Properties();
-
- private Level defaultLogLevel;
-
+
+ private Level defaultLogLevel;
+
public int defaultParallel = -1;
// Says, whether we're processing an explain right now. Explain
// might skip some check in the logical plan validation (file
// existence checks, etc).
public boolean inExplain = false;
-
+
// whether we're processing an ILLUSTRATE right now.
public boolean inIllustrator = false;
-
+
private String last_alias = null;
// List of paths skipped for automatic shipping
List<String> skippedShipPaths = new ArrayList<String>();
- static private ClassLoader classloader = PigContext.class.getClassLoader();
-
- private static Map<String, Class> classCache = new HashMap<String, Class>();;
+ /**
+ * extends URLClassLoader to allow adding to classpath as new jars
+ * are registered.
+ */
+ private static class ContextClassLoader extends URLClassLoader {
+
+ public ContextClassLoader(ClassLoader classLoader) {
+ this(new URL[0], classLoader);
+ }
+
+ public ContextClassLoader(URL[] urls, ClassLoader classLoader) {
+ super(urls, classLoader);
+ }
+
+ @Override
+ public void addURL(URL url) {
+ super.addURL(url);
+ }
+ };
+
+ static private ContextClassLoader classloader = new ContextClassLoader(PigContext.class.getClassLoader());
+
private List<String> params;
public List<String> getParams() {
@@ -155,14 +174,14 @@ public class PigContext implements Seria
this.paramFiles = paramFiles;
}
private List<String> paramFiles;
-
+
public PigContext() {
this(ExecType.MAPREDUCE, new Properties());
}
-
+
public PigContext(ExecType execType, Properties properties){
this.execType = execType;
- this.properties = properties;
+ this.properties = properties;
this.properties.setProperty("exectype", this.execType.name());
String pigJar = JarManager.findContainingJar(Main.class);
@@ -172,9 +191,9 @@ public class PigContext implements Seria
if (!pigJar.equals(hadoopJar))
skipJars.add(hadoopJar);
}
-
+
executionEngine = null;
-
+
// Add the default paths to be skipped for auto-shipping of commands
skippedShipPaths.add("/bin");
skippedShipPaths.add("/usr/bin");
@@ -183,7 +202,7 @@ public class PigContext implements Seria
skippedShipPaths.add("/usr/sbin");
skippedShipPaths.add("/usr/local/sbin");
}
-
+
public static void initializeImportList(String importListCommandLineProperties)
{
StringTokenizer tokenizer = new StringTokenizer(importListCommandLineProperties, ":");
@@ -198,7 +217,7 @@ public class PigContext implements Seria
pos++;
}
}
-
+
public void connect() throws ExecException {
switch (execType) {
@@ -208,14 +227,14 @@ public class PigContext implements Seria
executionEngine = new HExecutionEngine (this);
executionEngine.init();
-
+
dfs = executionEngine.getDataStorage();
-
+
lfs = new HDataStorage(URI.create("file:///"),
- properties);
+ properties);
}
break;
-
+
default:
{
int errCode = 2040;
@@ -229,7 +248,7 @@ public class PigContext implements Seria
public void setJobtrackerLocation(String newLocation) {
Properties trackerLocation = new Properties();
trackerLocation.setProperty("mapred.job.tracker", newLocation);
-
+
try {
executionEngine.updateConfiguration(trackerLocation);
}
@@ -260,18 +279,18 @@ public class PigContext implements Seria
aliasedScriptFiles.put(name.replaceFirst("^/", ""), new File(path));
}
}
-
+
public void addJar(String path) throws MalformedURLException {
if (path != null) {
URL resource = (new File(path)).toURI().toURL();
addJar(resource);
}
}
-
+
public void addJar(URL resource) throws MalformedURLException{
if (resource != null) {
extraJars.add(resource);
- PigContext.classloader = createCl(null);
+ classloader.addURL(resource);
Thread.currentThread().setContextClassLoader(PigContext.classloader);
}
}
@@ -288,18 +307,18 @@ public class PigContext implements Seria
if (oldName.equals(newName)) {
return;
}
-
+
System.out.println("Renaming " + oldName + " to " + newName);
ElementDescriptor dst = null;
- ElementDescriptor src = null;
+ ElementDescriptor src = null;
try {
dst = dfs.asElement(newName);
- src = dfs.asElement(oldName);
+ src = dfs.asElement(oldName);
}
catch (DataStorageException e) {
- byte errSrc = getErrorSource();
+ byte errSrc = getErrorSource();
int errCode = 0;
switch(errSrc) {
case PigException.REMOTE_ENVIRONMENT:
@@ -312,25 +331,25 @@ public class PigContext implements Seria
errCode = 2038;
break;
}
- String msg = "Unable to rename " + oldName + " to " + newName;
+ String msg = "Unable to rename " + oldName + " to " + newName;
throw new ExecException(msg, errCode, errSrc, e);
}
if (dst.exists()) {
dst.delete();
}
-
+
src.rename(dst);
}
public void copy(String src, String dst, boolean localDst) throws IOException {
DataStorage dstStorage = dfs;
-
+
if (localDst) {
dstStorage = lfs;
}
-
+
ElementDescriptor srcElement = null;
ElementDescriptor dstElement = null;
@@ -339,7 +358,7 @@ public class PigContext implements Seria
dstElement = dstStorage.asElement(dst);
}
catch (DataStorageException e) {
- byte errSrc = getErrorSource();
+ byte errSrc = getErrorSource();
int errCode = 0;
switch(errSrc) {
case PigException.REMOTE_ENVIRONMENT:
@@ -352,13 +371,13 @@ public class PigContext implements Seria
errCode = 2039;
break;
}
- String msg = "Unable to copy " + src + " to " + dst;
+ String msg = "Unable to copy " + src + " to " + dst;
throw new ExecException(msg, errCode, errSrc, e);
}
-
+
srcElement.copy(dstElement, this.properties, false);
}
-
+
public HExecutionEngine getExecutionEngine() {
return executionEngine;
}
@@ -374,17 +393,17 @@ public class PigContext implements Seria
public DataStorage getFs() {
return dfs;
}
-
+
/**
* Provides configuration information.
- *
+ *
* @return - information about the configuration used to connect to
* execution engine
*/
public Properties getProperties() {
return this.properties;
}
-
+
/**
* @deprecated use {@link #getProperties()} instead
*/
@@ -403,13 +422,13 @@ public class PigContext implements Seria
/**
* Defines an alias for the given function spec. This
- * is useful for functions that require arguments to the
+ * is useful for functions that require arguments to the
* constructor.
- *
+ *
* @param function - the new function alias to define.
- * @param functionSpec - the FuncSpec object representing the name of
+ * @param functionSpec - the FuncSpec object representing the name of
* the function class and any arguments to constructor.
- *
+ *
*/
public void registerFunction(String function, FuncSpec functionSpec) {
if (functionSpec == null) {
@@ -421,11 +440,11 @@ public class PigContext implements Seria
/**
* Defines an alias for the given streaming command.
- *
+ *
* This is useful for complicated streaming command specs.
- *
+ *
* @param alias - the new command alias to define.
- * @param command - the command
+ * @param command - the command
*/
public void registerStreamCmd(String alias, StreamingCommand command) {
if (command == null) {
@@ -437,19 +456,16 @@ public class PigContext implements Seria
/**
* Returns the type of execution currently in effect.
- *
+ *
* @return current execution type
*/
public ExecType getExecType() {
return execType;
}
-
-
-
/**
* Creates a Classloader based on the passed jarFile and any extra jar files.
- *
+ *
* @param jarFile
* the jar file to be part of the newly created Classloader. This jar file plus any
* jars in the extraJars list will constitute the classpath.
@@ -467,30 +483,26 @@ public class PigContext implements Seria
urls[i + passedJar] = extraJars.get(i);
}
//return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
- return new URLClassLoader(urls, PigContext.class.getClassLoader());
+ return new ContextClassLoader(urls, PigContext.class.getClassLoader());
}
-
+
@SuppressWarnings("rawtypes")
public static Class resolveClassName(String name) throws IOException{
- if (classCache.containsKey(name)) {
- return classCache.get(name);
- }
for(String prefix: getPackageImportList()) {
Class c;
try {
c = Class.forName(prefix+name,true, PigContext.classloader);
- classCache.put(name, c);
return c;
- }
+ }
catch (ClassNotFoundException e) {
// do nothing
- }
+ }
catch (UnsupportedClassVersionError e) {
int errCode = 1069;
String msg = "Problem resolving class version numbers for class " + name;
throw new ExecException(msg, errCode, PigException.INPUT, e) ;
}
-
+
}
// create ClassNotFoundException exception and attach to IOException
@@ -541,7 +553,7 @@ public class PigContext implements Seria
@SuppressWarnings({ "unchecked", "rawtypes" })
public static Object instantiateFuncFromSpec(FuncSpec funcSpec) {
Object ret;
- String className =funcSpec.getClassName();
+ String className =funcSpec.getClassName();
String[] args = funcSpec.getCtorArgs();
Class objClass = null ;
@@ -596,11 +608,11 @@ public class PigContext implements Seria
}
return ret;
}
-
+
public static Object instantiateFuncFromSpec(String funcSpec) {
return instantiateFuncFromSpec(new FuncSpec(funcSpec));
- }
-
+ }
+
@SuppressWarnings("rawtypes")
public Class getClassForAlias(String alias) throws IOException{
String className = null;
@@ -615,7 +627,7 @@ public class PigContext implements Seria
}
return resolveClassName(className);
}
-
+
public Object instantiateFuncFromAlias(String alias) throws IOException {
FuncSpec funcSpec;
if (definedFunctions != null && (funcSpec = definedFunctions.get(alias))!=null)
@@ -626,21 +638,21 @@ public class PigContext implements Seria
/**
* Get the {@link StreamingCommand} for the given alias.
- *
+ *
* @param alias the alias for the <code>StreamingCommand</code>
* @return <code>StreamingCommand</code> for the alias
*/
public StreamingCommand getCommandForAlias(String alias) {
return definedCommands.get(alias);
}
-
+
public void setExecType(ExecType execType) {
this.execType = execType;
}
-
+
/**
* Create a new {@link ExecutableManager} depending on the ExecType.
- *
+ *
* @return a new {@link ExecutableManager} depending on the ExecType
* @throws ExecException
*/
@@ -649,7 +661,7 @@ public class PigContext implements Seria
switch (execType) {
case LOCAL:
- case MAPREDUCE:
+ case MAPREDUCE:
{
executableManager = new HadoopExecutableManager();
}
@@ -661,7 +673,7 @@ public class PigContext implements Seria
throw new ExecException(msg, errCode, PigException.BUG);
}
}
-
+
return executableManager;
}
@@ -674,29 +686,29 @@ public class PigContext implements Seria
}
/**
- * Add a path to be skipped while automatically shipping binaries for
+ * Add a path to be skipped while automatically shipping binaries for
* streaming.
- *
+ *
* @param path path to be skipped
*/
public void addPathToSkip(String path) {
skippedShipPaths.add(path);
}
-
+
/**
* Get paths which are to skipped while automatically shipping binaries for
* streaming.
- *
- * @return paths which are to skipped while automatically shipping binaries
+ *
+ * @return paths which are to skipped while automatically shipping binaries
* for streaming
*/
public List<String> getPathsToSkip() {
return skippedShipPaths;
}
-
+
/**
* Check the execution mode and return the appropriate error source
- *
+ *
* @return error source
*/
public byte getErrorSource() {
@@ -704,24 +716,24 @@ public class PigContext implements Seria
return PigException.REMOTE_ENVIRONMENT;
} else {
return PigException.BUG;
- }
+ }
}
-
+
public static ArrayList<String> getPackageImportList() {
if (packageImportList.get() == null) {
ArrayList<String> importlist = new ArrayList<String>();
importlist.add("");
importlist.add("org.apache.pig.builtin.");
- importlist.add("org.apache.pig.impl.builtin.");
+ importlist.add("org.apache.pig.impl.builtin.");
packageImportList.set(importlist);
}
return packageImportList.get();
}
-
+
public static void setPackageImportList(ArrayList<String> list) {
packageImportList.set(list);
}
-
+
public void setLog4jProperties(Properties p)
{
log4jProperties = p;
@@ -742,6 +754,10 @@ public class PigContext implements Seria
return classloader;
}
public static void setClassLoader(ClassLoader cl) {
- classloader = cl;
+ if (cl instanceof ContextClassLoader) {
+ classloader = (ContextClassLoader) cl;
+ } else {
+ classloader = new ContextClassLoader(cl);
+ }
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java Mon Jul 16 18:36:14 2012
@@ -79,6 +79,7 @@ public class TestRegisteredJarVisibility
File[] javaFiles = new File[]{
new File(testResourcesDir, "RegisteredJarVisibilityLoader.java"),
+ new File(testResourcesDir, "ClassLoaderSanityCheck.java"),
new File(testResourcesDir, "RegisteredJarVisibilitySchema.java")};
List<File> classFiles = compile(javaFiles);
@@ -120,7 +121,12 @@ public class TestRegisteredJarVisibility
String query = "register " + jarFile.getAbsolutePath() + ";\n"
+ "a = load '" + INPUT_FILE.getName()
- + "' using org.apache.pig.test.RegisteredJarVisibilityLoader();";
+ + "' using org.apache.pig.test.RegisteredJarVisibilityLoader();\n"
+ // register again to test classloader consistency
+ + "register " + jarFile.getAbsolutePath() + ";\n"
+ + "b = load 'non_existent' "
+ + "using org.apache.pig.test.RegisteredJarVisibilityLoader();";
+
LOG.info("Running pig script:\n" + query);
pigServer.registerScript(new ByteArrayInputStream(query.getBytes()));
Modified: pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java (original)
+++ pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java Mon Jul 16 18:36:14 2012
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.util.UDFContext;
import java.io.IOException;
@@ -33,6 +34,18 @@ public class RegisteredJarVisibilityLoad
private static final Log LOG = LogFactory.getLog(RegisteredJarVisibilityLoader.class);
private static final String REGISTERED_JAR_VISIBILITY_SCHEMA = "registered.jar.visibility.schema";
+ public RegisteredJarVisibilityLoader() throws IOException, ClassNotFoundException {
+ // make sure classes that were visible here and through current
+ // PigContext are the same
+ Class<?> clazz = ClassLoaderSanityCheck.class;
+ Class<?> loadedClass = Class.forName(clazz.getName(), true, PigContext.getClassLoader());
+ if (!clazz.equals(loadedClass)) {
+ throw new RuntimeException("class loader sanity check failed. "
+ + "please make sure jar registration does not result in "
+ + "multiple instances of same classes");
+ }
+ }
+
@Override
public void setLocation(String location, Job job) throws IOException {
UDFContext udfContext = UDFContext.getUDFContext();