You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/16 21:36:40 UTC

svn commit: r910677 - in /hadoop/pig/branches/branch-0.6: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/tools/grunt/

Author: pradeepkth
Date: Tue Feb 16 20:36:40 2010
New Revision: 910677

URL: http://svn.apache.org/viewvc?rev=910677&view=rev
Log:
PIG-1239: PigContext.connect() should not create a jobClient and jobClient should be created on demand when needed (pradeepkth)

Modified:
    hadoop/pig/branches/branch-0.6/CHANGES.txt
    hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java

Modified: hadoop/pig/branches/branch-0.6/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/CHANGES.txt?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.6/CHANGES.txt Tue Feb 16 20:36:40 2010
@@ -151,6 +151,9 @@
 
 BUG FIXES
 
+PIG-1239: PigContext.connect() should not create a jobClient and jobClient
+should be created on demand when needed (pradeepkth)
+
 PIG-1213: Schema serialization is broken (pradeepkth)
 
 PIG-1191: POCast throws exception for certain sequences of LOAD, FILTER,

Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Feb 16 20:36:40 2010
@@ -90,7 +90,8 @@
     
     protected DataStorage ds;
     
-    protected JobClient jobClient;
+    @SuppressWarnings("deprecation")
+    protected JobConf jobConf;
 
     // key: the operator key from the logical plan that originated the physical plan
     // val: the operator key for the root of the phyisical plan
@@ -110,11 +111,12 @@
         this.ds = null;
         
         // to be set in the init method
-        this.jobClient = null;
+        this.jobConf = null;
     }
     
-    public JobClient getJobClient() {
-        return this.jobClient;
+    @SuppressWarnings("deprecation")
+    public JobConf getJobConf() {
+        return this.jobConf;
     }
     
     public Map<OperatorKey, MapRedResult> getMaterializedResults() {
@@ -134,6 +136,7 @@
         init(this.pigContext.getProperties());
     }
     
+    @SuppressWarnings("deprecation")
     public void init(Properties properties) throws ExecException {
         //First set the ssh socket factory
         setSSHFactory();
@@ -155,13 +158,13 @@
         // Now add the settings from "properties" object to override any existing properties
         // All of the above is accomplished in the method call below
            
-        JobConf jobConf = new JobConf();
-        jobConf.addResource("pig-cluster-hadoop-site.xml");
+        JobConf jc = new JobConf();
+        jc.addResource("pig-cluster-hadoop-site.xml");
             
         //the method below alters the properties object by overriding the
         //hadoop properties with the values from properties and recomputing
         //the properties
-        recomputeProperties(jobConf, properties);
+        recomputeProperties(jc, properties);
             
         configuration = ConfigurationUtil.toConfiguration(properties);            
         properties = ConfigurationUtil.toProperties(configuration);
@@ -193,15 +196,8 @@
                 log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
         }
 
-        try {
-            // Set job-specific configuration knobs
-            jobClient = new JobClient(new JobConf(configuration));
-        }
-        catch (IOException e) {
-            int errCode = 6009;
-            String msg = "Failed to create job client:" + e.getMessage();
-            throw new ExecException(msg, errCode, PigException.BUG, e);
-        }
+        // Set job-specific configuration knobs
+        jobConf = new JobConf(configuration);
     }
 
     public Properties getConfiguration() throws ExecException {

Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Tue Feb 16 20:36:40 2010
@@ -1878,7 +1878,8 @@
      * @throws PlanException
      * @throws VisitorException
      */
-  	protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
+  	@SuppressWarnings("deprecation")
+    protected Pair<MapReduceOper,Integer> getSamplingJob(POSort sort, MapReduceOper prevJob, List<PhysicalPlan> transformPlans,
   			FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans, 
   			String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
   		
@@ -2053,7 +2054,7 @@
                     if(val<=0)
                         val = pigContext.defaultParallel;
                     if (val<=0)
-                        val = ((JobConf)((HExecutionEngine)eng).getJobClient().getConf()).getNumReduceTasks();
+                        val = ((JobConf)((HExecutionEngine)eng).getJobConf()).getNumReduceTasks();
                     if (val<=0)
                         val = 1;
                 } catch (Exception e) {

Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Tue Feb 16 20:36:40 2010
@@ -109,7 +109,7 @@
         ExecutionEngine exe = pc.getExecutionEngine();
         ConfigurationValidator.validatePigProperties(exe.getConfiguration());
         Configuration conf = ConfigurationUtil.toConfiguration(exe.getConfiguration());
-        JobClient jobClient = ((HExecutionEngine)exe).getJobClient();
+        JobClient jobClient = new JobClient(((HExecutionEngine)exe).getJobConf());
 
         JobControlCompiler jcc = new JobControlCompiler(pc, conf);
         

Modified: hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java?rev=910677&r1=910676&r2=910677&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/branches/branch-0.6/src/org/apache/pig/tools/grunt/GruntParser.java Tue Feb 16 20:36:40 2010
@@ -49,6 +49,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FsShell;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.pig.FuncSpec;
@@ -71,6 +72,7 @@
 import org.apache.pig.tools.parameters.ParameterSubstitutionPreprocessor;
 import org.apache.pig.impl.util.LogUtils;
 
+@SuppressWarnings("deprecation")
 public class GruntParser extends PigScriptParser {
 
     private final Log log = LogFactory.getLog(getClass());
@@ -203,13 +205,14 @@
         //
         ExecutionEngine execEngine = mPigServer.getPigContext().getExecutionEngine();
         if (execEngine instanceof HExecutionEngine) {
-            mJobClient = ((HExecutionEngine)execEngine).getJobClient();
+            mJobConf = ((HExecutionEngine)execEngine).getJobConf();
         }
         else {
-            mJobClient = null;
+            mJobConf = null;
         }
     }
 
+    @Override
     public void prompt()
     {
         if (mInteractive) {
@@ -217,6 +220,7 @@
         }
     }
     
+    @Override
     protected void quit()
     {
         mDone = true;
@@ -226,6 +230,7 @@
         return mDone;
     }
     
+    @Override
     protected void processDescribe(String alias) throws IOException {
         if(alias==null) {
             alias = mPigServer.getPigContext().getLastAlias();
@@ -233,6 +238,7 @@
         mPigServer.dumpSchema(alias);
     }
 
+    @Override
     protected void processExplain(String alias, String script, boolean isVerbose, 
                                   String format, String target, 
                                   List<String> params, List<String> files) 
@@ -317,10 +323,12 @@
         }
     }
 
+    @Override
     protected void printAliases() throws IOException {
         mPigServer.printAliases();
     }
     
+    @Override
     protected void processRegister(String jar) throws IOException {
         mPigServer.registerJar(jar);
     }
@@ -344,6 +352,7 @@
         return writer.toString();
     }
 
+    @Override
     protected void processScript(String script, boolean batch, 
                                  List<String> params, List<String> files) 
         throws IOException, ParseException {
@@ -416,6 +425,7 @@
         }
     }
 
+    @Override
     protected void processSet(String key, String value) throws IOException, ParseException {
         if (key.equals("debug"))
         {
@@ -460,6 +470,7 @@
         }
     }
     
+    @Override
     protected void processCat(String path) throws IOException
     {
         executeBatch();
@@ -503,6 +514,7 @@
         }
     }
 
+    @Override
     protected void processCD(String path) throws IOException
     {    
         ContainerDescriptor container;
@@ -534,6 +546,7 @@
         }
     }
 
+    @Override
     protected void processDump(String alias) throws IOException
     {
         Iterator<Tuple> result = mPigServer.openIterator(alias);
@@ -544,16 +557,19 @@
         }
     }
     
+    @Override
     protected void processIllustrate(String alias) throws IOException
     {
 	mPigServer.getExamples(alias);
     }
 
+    @Override
     protected void processKill(String jobid) throws IOException
     {
-        if (mJobClient != null) {
+        if (mJobConf != null) {
+            JobClient jc = new JobClient(mJobConf);
             JobID id = JobID.forName(jobid);
-            RunningJob job = mJobClient.getJob(id);
+            RunningJob job = jc.getJob(id);
             if (job == null)
                 System.out.println("Job with id " + jobid + " is not active");
             else
@@ -564,6 +580,7 @@
         }
     }
         
+    @Override
     protected void processLS(String path) throws IOException
     {
         try {
@@ -613,11 +630,13 @@
         System.out.println(elem.toString() + "<r " + replication + ">\t" + len);
     }
     
+    @Override
     protected void processPWD() throws IOException 
     {
         System.out.println(mDfs.getActiveContainer().toString());
     }
 
+    @Override
     protected void printHelp() 
     {
         System.out.println("Commands:");
@@ -636,6 +655,7 @@
         System.out.println("quit");
     }
 
+    @Override
     protected void processMove(String src, String dst) throws IOException
     {
         executeBatch();
@@ -655,6 +675,7 @@
         }
     }
     
+    @Override
     protected void processCopy(String src, String dst) throws IOException
     {
         executeBatch();
@@ -670,6 +691,7 @@
         }
     }
     
+    @Override
     protected void processCopyToLocal(String src, String dst) throws IOException
     {
         executeBatch();
@@ -685,6 +707,7 @@
         }
     }
 
+    @Override
     protected void processCopyFromLocal(String src, String dst) throws IOException
     {
         executeBatch();
@@ -700,12 +723,14 @@
         }
     }
     
+    @Override
     protected void processMkdir(String dir) throws IOException
     {
         ContainerDescriptor dirDescriptor = mDfs.asContainer(dir);
         dirDescriptor.create();
     }
     
+    @Override
     protected void processPig(String cmd) throws IOException
     {
         int start = 1;
@@ -721,6 +746,7 @@
         }
     }
 
+    @Override
     protected void processRemove(String path, String options ) throws IOException
     {
         ElementDescriptor dfsPath = mDfs.asElement(path);
@@ -738,6 +764,7 @@
         }
     }
 
+    @Override
     protected void processFsCommand(String[] cmdTokens) throws IOException{
         try {
             shell.run(cmdTokens);
@@ -773,7 +800,7 @@
     private DataStorage mDfs;
     private DataStorage mLfs;
     private Properties mConf;
-    private JobClient mJobClient;
+    private JobConf mJobConf;
     private boolean mDone;
     private boolean mLoadOnly;
     private ExplainState mExplain;