You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by jc...@apache.org on 2012/07/16 20:36:15 UTC

svn commit: r1362181 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/PigContext.java test/org/apache/pig/test/TestRegisteredJarVisibility.java test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java

Author: jcoveney
Date: Mon Jul 16 18:36:14 2012
New Revision: 1362181

URL: http://svn.apache.org/viewvc?rev=1362181&view=rev
Log:
PIG-2815: class loader management in PigContext (rangadi via jcoveney)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/PigContext.java
    pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java
    pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 16 18:36:14 2012
@@ -194,6 +194,8 @@ OPTIMIZATIONS
 
 BUG FIXES
 
+PIG-2815: class loader management in PigContext (rangadi via jcoveney)
+
 PIG-2813: Fix test regressions from PIG-2632 (jcoveney)
 
 PIG-2806: Fix merge join test regression from PIG-2632 (jcoveney)

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Jul 16 18:36:14 2012
@@ -58,85 +58,104 @@ import org.apache.pig.impl.util.JarManag
 
 public class PigContext implements Serializable {
     private static final long serialVersionUID = 1L;
-    
+
     private static final Log log = LogFactory.getLog(PigContext.class);
-    
+
     public static final String JOB_NAME = "jobName";
     public static final String JOB_NAME_PREFIX= "PigLatin";
     public static final String JOB_PRIORITY = "jobPriority";
     public static final String PIG_CMD_ARGS_REMAINDERS = "pig.cmd.args.remainders";
-    
-    
-    /* NOTE: we only serialize some of the stuff 
-     * 
-     *(to make it smaller given that it's not all needed on the Hadoop side, 
+
+
+    /* NOTE: we only serialize some of the stuff
+     *
+     *(to make it smaller given that it's not all needed on the Hadoop side,
      * and also because some is not serializable e.g. the Configuration)
      */
-    
+
     //one of: local, mapreduce, pigbody
-    private ExecType execType;;    
+    private ExecType execType;;
 
     //  extra jar files that are needed to run a job
-    transient public List<URL> extraJars = new LinkedList<URL>();              
-    
+    transient public List<URL> extraJars = new LinkedList<URL>();
+
     //  The jars that should not be merged in. (Some functions may come from pig.jar and we don't want the whole jar file.)
-    transient public Vector<String> skipJars = new Vector<String>(2);    
-    
+    transient public Vector<String> skipJars = new Vector<String>(2);
+
     //main file system that jobs and shell commands access
-    transient private DataStorage dfs;                         
-    
+    transient private DataStorage dfs;
+
     //  local file system, where jar files, etc. reside
-    transient private DataStorage lfs;                         
-    
+    transient private DataStorage lfs;
+
     // handle to the back-end
     transient private HExecutionEngine executionEngine;
-   
+
     private Properties properties;
-    
+
     //  script files that are needed to run a job
     @Deprecated
     public List<String> scriptFiles = new ArrayList<String>();
     private Map<String,File> aliasedScriptFiles = new LinkedHashMap<String,File>();
-    
+
     //  script jars that are needed to run a script - jython.jar etc
     public List<String> scriptJars = new ArrayList<String>(2);
-    
+
     /**
      * a table mapping function names to function specs.
      */
     private Map<String, FuncSpec> definedFunctions = new HashMap<String, FuncSpec>();
-    
+
     /**
      * a table mapping names to streaming commands.
      */
-    private Map<String, StreamingCommand> definedCommands = 
+    private Map<String, StreamingCommand> definedCommands =
         new HashMap<String, StreamingCommand>();
 
-    private static ThreadLocal<ArrayList<String>> packageImportList = 
+    private static ThreadLocal<ArrayList<String>> packageImportList =
         new ThreadLocal<ArrayList<String>>();
 
     private Properties log4jProperties = new Properties();
-    
-    private Level defaultLogLevel; 
-    
+
+    private Level defaultLogLevel;
+
     public int defaultParallel = -1;
 
     // Says, whether we're processing an explain right now. Explain
     // might skip some check in the logical plan validation (file
     // existence checks, etc).
     public boolean inExplain = false;
-    
+
     // whether we're processing an ILLUSTRATE right now.
     public boolean inIllustrator = false;
-    
+
     private String last_alias = null;
 
     // List of paths skipped for automatic shipping
     List<String> skippedShipPaths = new ArrayList<String>();
 
-    static private ClassLoader classloader = PigContext.class.getClassLoader();
-    
-    private static Map<String, Class> classCache = new HashMap<String, Class>();;
+    /**
+     * extends URLClassLoader to allow adding to classpath as new jars
+     * are registered.
+     */
+    private static class ContextClassLoader extends URLClassLoader {
+
+        public ContextClassLoader(ClassLoader classLoader) {
+            this(new URL[0], classLoader);
+        }
+
+        public ContextClassLoader(URL[] urls, ClassLoader classLoader) {
+            super(urls, classLoader);
+        }
+
+        @Override
+        public void addURL(URL url) {
+            super.addURL(url);
+        }
+    };
+
+    static private ContextClassLoader classloader = new ContextClassLoader(PigContext.class.getClassLoader());
+
 
     private List<String> params;
     public List<String> getParams() {
@@ -155,14 +174,14 @@ public class PigContext implements Seria
         this.paramFiles = paramFiles;
     }
     private List<String> paramFiles;
-    
+
     public PigContext() {
         this(ExecType.MAPREDUCE, new Properties());
     }
-        
+
     public PigContext(ExecType execType, Properties properties){
         this.execType = execType;
-        this.properties = properties;   
+        this.properties = properties;
 
         this.properties.setProperty("exectype", this.execType.name());
         String pigJar = JarManager.findContainingJar(Main.class);
@@ -172,9 +191,9 @@ public class PigContext implements Seria
             if (!pigJar.equals(hadoopJar))
                 skipJars.add(hadoopJar);
         }
-        
+
         executionEngine = null;
-        
+
         // Add the default paths to be skipped for auto-shipping of commands
         skippedShipPaths.add("/bin");
         skippedShipPaths.add("/usr/bin");
@@ -183,7 +202,7 @@ public class PigContext implements Seria
         skippedShipPaths.add("/usr/sbin");
         skippedShipPaths.add("/usr/local/sbin");
     }
-    
+
     public static void initializeImportList(String importListCommandLineProperties)
     {
         StringTokenizer tokenizer = new StringTokenizer(importListCommandLineProperties, ":");
@@ -198,7 +217,7 @@ public class PigContext implements Seria
             pos++;
         }
     }
-    
+
     public void connect() throws ExecException {
 
         switch (execType) {
@@ -208,14 +227,14 @@ public class PigContext implements Seria
                 executionEngine = new HExecutionEngine (this);
 
                 executionEngine.init();
-                
+
                 dfs = executionEngine.getDataStorage();
-                
+
                 lfs = new HDataStorage(URI.create("file:///"),
-                                        properties); 
+                                        properties);
             }
             break;
-            
+
             default:
             {
                 int errCode = 2040;
@@ -229,7 +248,7 @@ public class PigContext implements Seria
     public void setJobtrackerLocation(String newLocation) {
         Properties trackerLocation = new Properties();
         trackerLocation.setProperty("mapred.job.tracker", newLocation);
-        
+
         try {
             executionEngine.updateConfiguration(trackerLocation);
         }
@@ -260,18 +279,18 @@ public class PigContext implements Seria
             aliasedScriptFiles.put(name.replaceFirst("^/", ""), new File(path));
         }
     }
-    
+
     public void addJar(String path) throws MalformedURLException {
         if (path != null) {
             URL resource = (new File(path)).toURI().toURL();
             addJar(resource);
         }
     }
-    
+
     public void addJar(URL resource) throws MalformedURLException{
         if (resource != null) {
             extraJars.add(resource);
-            PigContext.classloader = createCl(null);
+            classloader.addURL(resource);
             Thread.currentThread().setContextClassLoader(PigContext.classloader);
         }
     }
@@ -288,18 +307,18 @@ public class PigContext implements Seria
         if (oldName.equals(newName)) {
             return;
         }
-        
+
         System.out.println("Renaming " + oldName + " to " + newName);
 
         ElementDescriptor dst = null;
-        ElementDescriptor src = null;            
+        ElementDescriptor src = null;
 
         try {
             dst = dfs.asElement(newName);
-            src = dfs.asElement(oldName);            
+            src = dfs.asElement(oldName);
         }
         catch (DataStorageException e) {
-            byte errSrc = getErrorSource();            
+            byte errSrc = getErrorSource();
             int errCode = 0;
             switch(errSrc) {
             case PigException.REMOTE_ENVIRONMENT:
@@ -312,25 +331,25 @@ public class PigContext implements Seria
                 errCode = 2038;
                     break;
             }
-            String msg = "Unable to rename " + oldName + " to " + newName;            
+            String msg = "Unable to rename " + oldName + " to " + newName;
             throw new ExecException(msg, errCode, errSrc, e);
         }
 
         if (dst.exists()) {
             dst.delete();
         }
-        
+
         src.rename(dst);
 
     }
 
     public void copy(String src, String dst, boolean localDst) throws IOException {
         DataStorage dstStorage = dfs;
-        
+
         if (localDst) {
             dstStorage = lfs;
         }
-        
+
         ElementDescriptor srcElement = null;
         ElementDescriptor dstElement = null;
 
@@ -339,7 +358,7 @@ public class PigContext implements Seria
             dstElement = dstStorage.asElement(dst);
         }
         catch (DataStorageException e) {
-            byte errSrc = getErrorSource();            
+            byte errSrc = getErrorSource();
             int errCode = 0;
             switch(errSrc) {
             case PigException.REMOTE_ENVIRONMENT:
@@ -352,13 +371,13 @@ public class PigContext implements Seria
                 errCode = 2039;
                     break;
             }
-            String msg = "Unable to copy " + src + " to " + dst;            
+            String msg = "Unable to copy " + src + " to " + dst;
             throw new ExecException(msg, errCode, errSrc, e);
         }
-        
+
         srcElement.copy(dstElement, this.properties, false);
     }
-    
+
     public HExecutionEngine getExecutionEngine() {
         return executionEngine;
     }
@@ -374,17 +393,17 @@ public class PigContext implements Seria
     public DataStorage getFs() {
         return dfs;
     }
-    
+
     /**
      * Provides configuration information.
-     * 
+     *
      * @return - information about the configuration used to connect to
      *         execution engine
      */
     public Properties getProperties() {
         return this.properties;
     }
-    
+
     /**
      * @deprecated use {@link #getProperties()} instead
      */
@@ -403,13 +422,13 @@ public class PigContext implements Seria
 
     /**
      * Defines an alias for the given function spec. This
-     * is useful for functions that require arguments to the 
+     * is useful for functions that require arguments to the
      * constructor.
-     * 
+     *
      * @param function - the new function alias to define.
-     * @param functionSpec - the FuncSpec object representing the name of 
+     * @param functionSpec - the FuncSpec object representing the name of
      * the function class and any arguments to constructor.
-     * 
+     *
      */
     public void registerFunction(String function, FuncSpec functionSpec) {
         if (functionSpec == null) {
@@ -421,11 +440,11 @@ public class PigContext implements Seria
 
     /**
      * Defines an alias for the given streaming command.
-     * 
+     *
      * This is useful for complicated streaming command specs.
-     * 
+     *
      * @param alias - the new command alias to define.
-     * @param command - the command 
+     * @param command - the command
      */
     public void registerStreamCmd(String alias, StreamingCommand command) {
         if (command == null) {
@@ -437,19 +456,16 @@ public class PigContext implements Seria
 
     /**
      * Returns the type of execution currently in effect.
-     * 
+     *
      * @return current execution type
      */
     public ExecType getExecType() {
         return execType;
     }
-    
-    
-    
 
     /**
      * Creates a Classloader based on the passed jarFile and any extra jar files.
-     * 
+     *
      * @param jarFile
      *            the jar file to be part of the newly created Classloader. This jar file plus any
      *            jars in the extraJars list will constitute the classpath.
@@ -467,30 +483,26 @@ public class PigContext implements Seria
             urls[i + passedJar] = extraJars.get(i);
         }
         //return new URLClassLoader(urls, PigMapReduce.class.getClassLoader());
-        return new URLClassLoader(urls, PigContext.class.getClassLoader());
+        return new ContextClassLoader(urls, PigContext.class.getClassLoader());
     }
-        
+
     @SuppressWarnings("rawtypes")
     public static Class resolveClassName(String name) throws IOException{
-        if (classCache.containsKey(name)) {
-            return classCache.get(name);
-        }
         for(String prefix: getPackageImportList()) {
             Class c;
             try {
                 c = Class.forName(prefix+name,true, PigContext.classloader);
-                classCache.put(name, c);
                 return c;
-            } 
+            }
             catch (ClassNotFoundException e) {
                 // do nothing
-            } 
+            }
             catch (UnsupportedClassVersionError e) {
                 int errCode = 1069;
                 String msg = "Problem resolving class version numbers for class " + name;
                 throw new ExecException(msg, errCode, PigException.INPUT, e) ;
             }
-            
+
         }
 
         // create ClassNotFoundException exception and attach to IOException
@@ -541,7 +553,7 @@ public class PigContext implements Seria
     @SuppressWarnings({ "unchecked", "rawtypes" })
     public static Object instantiateFuncFromSpec(FuncSpec funcSpec)  {
         Object ret;
-        String className =funcSpec.getClassName(); 
+        String className =funcSpec.getClassName();
         String[] args = funcSpec.getCtorArgs();
         Class objClass = null ;
 
@@ -596,11 +608,11 @@ public class PigContext implements Seria
         }
         return ret;
     }
-    
+
     public static Object instantiateFuncFromSpec(String funcSpec)  {
         return instantiateFuncFromSpec(new FuncSpec(funcSpec));
-    }    
-    
+    }
+
     @SuppressWarnings("rawtypes")
     public Class getClassForAlias(String alias) throws IOException{
         String className = null;
@@ -615,7 +627,7 @@ public class PigContext implements Seria
         }
         return resolveClassName(className);
     }
-  
+
     public Object instantiateFuncFromAlias(String alias) throws IOException {
         FuncSpec funcSpec;
         if (definedFunctions != null && (funcSpec = definedFunctions.get(alias))!=null)
@@ -626,21 +638,21 @@ public class PigContext implements Seria
 
     /**
      * Get the {@link StreamingCommand} for the given alias.
-     * 
+     *
      * @param alias the alias for the <code>StreamingCommand</code>
      * @return <code>StreamingCommand</code> for the alias
      */
     public StreamingCommand getCommandForAlias(String alias) {
         return definedCommands.get(alias);
     }
-    
+
     public void setExecType(ExecType execType) {
         this.execType = execType;
     }
-    
+
     /**
      * Create a new {@link ExecutableManager} depending on the ExecType.
-     * 
+     *
      * @return a new {@link ExecutableManager} depending on the ExecType
      * @throws ExecException
      */
@@ -649,7 +661,7 @@ public class PigContext implements Seria
 
         switch (execType) {
             case LOCAL:
-            case MAPREDUCE: 
+            case MAPREDUCE:
             {
                 executableManager = new HadoopExecutableManager();
             }
@@ -661,7 +673,7 @@ public class PigContext implements Seria
                 throw new ExecException(msg, errCode, PigException.BUG);
             }
         }
-        
+
         return executableManager;
     }
 
@@ -674,29 +686,29 @@ public class PigContext implements Seria
     }
 
     /**
-     * Add a path to be skipped while automatically shipping binaries for 
+     * Add a path to be skipped while automatically shipping binaries for
      * streaming.
-     *  
+     *
      * @param path path to be skipped
      */
     public void addPathToSkip(String path) {
         skippedShipPaths.add(path);
     }
-    
+
     /**
      * Get paths which are to skipped while automatically shipping binaries for
      * streaming.
-     * 
-     * @return paths which are to skipped while automatically shipping binaries 
+     *
+     * @return paths which are to skipped while automatically shipping binaries
      *         for streaming
      */
     public List<String> getPathsToSkip() {
         return skippedShipPaths;
     }
-    
+
     /**
      * Check the execution mode and return the appropriate error source
-     * 
+     *
      * @return error source
      */
     public byte getErrorSource() {
@@ -704,24 +716,24 @@ public class PigContext implements Seria
             return PigException.REMOTE_ENVIRONMENT;
         } else {
             return PigException.BUG;
-        }        
+        }
     }
-    
+
     public static ArrayList<String> getPackageImportList() {
         if (packageImportList.get() == null) {
             ArrayList<String> importlist = new ArrayList<String>();
             importlist.add("");
             importlist.add("org.apache.pig.builtin.");
-            importlist.add("org.apache.pig.impl.builtin.");  
+            importlist.add("org.apache.pig.impl.builtin.");
             packageImportList.set(importlist);
         }
         return packageImportList.get();
     }
-    
+
     public static void setPackageImportList(ArrayList<String> list) {
         packageImportList.set(list);
     }
-    
+
     public void setLog4jProperties(Properties p)
     {
         log4jProperties = p;
@@ -742,6 +754,10 @@ public class PigContext implements Seria
         return classloader;
     }
     public static void setClassLoader(ClassLoader cl) {
-        classloader = cl;
+        if (cl instanceof ContextClassLoader) {
+            classloader = (ContextClassLoader) cl;
+        } else {
+            classloader = new ContextClassLoader(cl);
+        }
     }
 }

Modified: pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestRegisteredJarVisibility.java Mon Jul 16 18:36:14 2012
@@ -79,6 +79,7 @@ public class TestRegisteredJarVisibility
 
         File[] javaFiles = new File[]{
                 new File(testResourcesDir, "RegisteredJarVisibilityLoader.java"),
+                new File(testResourcesDir, "ClassLoaderSanityCheck.java"),
                 new File(testResourcesDir, "RegisteredJarVisibilitySchema.java")};
 
         List<File> classFiles = compile(javaFiles);
@@ -120,7 +121,12 @@ public class TestRegisteredJarVisibility
 
         String query = "register " + jarFile.getAbsolutePath() + ";\n"
                 + "a = load '" + INPUT_FILE.getName()
-                + "' using org.apache.pig.test.RegisteredJarVisibilityLoader();";
+                + "' using org.apache.pig.test.RegisteredJarVisibilityLoader();\n"
+                // register again to test classloader consistency
+                + "register " +  jarFile.getAbsolutePath() + ";\n"
+                + "b = load 'non_existent' "
+                + "using org.apache.pig.test.RegisteredJarVisibilityLoader();";
+
         LOG.info("Running pig script:\n" + query);
         pigServer.registerScript(new ByteArrayInputStream(query.getBytes()));
 

Modified: pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java?rev=1362181&r1=1362180&r2=1362181&view=diff
==============================================================================
--- pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java (original)
+++ pig/trunk/test/resources/org/apache/pig/test/RegisteredJarVisibilityLoader.java Mon Jul 16 18:36:14 2012
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.util.UDFContext;
 
 import java.io.IOException;
@@ -33,6 +34,18 @@ public class RegisteredJarVisibilityLoad
     private static final Log LOG = LogFactory.getLog(RegisteredJarVisibilityLoader.class);
     private static final String REGISTERED_JAR_VISIBILITY_SCHEMA = "registered.jar.visibility.schema";
 
+    public RegisteredJarVisibilityLoader() throws IOException, ClassNotFoundException {
+        // make sure classes that were visible here and through current
+        // PigContext are the same
+        Class<?> clazz = ClassLoaderSanityCheck.class;
+        Class<?> loadedClass = Class.forName(clazz.getName(), true, PigContext.getClassLoader());
+        if (!clazz.equals(loadedClass)) {
+            throw new RuntimeException("class loader sanity check failed. "
+                    + "please make sure jar registration does not result in "
+                    + "multiple instances of same classes");
+        }
+    }
+
     @Override
     public void setLocation(String location, Job job) throws IOException {
         UDFContext udfContext = UDFContext.getUDFContext();