You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/25 02:19:57 UTC

svn commit: r698783 - in /incubator/pig/branches/types: ./ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/streaming/ src/or...

Author: olga
Date: Wed Sep 24 17:19:57 2008
New Revision: 698783

URL: http://svn.apache.org/viewvc?rev=698783&view=rev
Log:
PIG-458: move to hadoop 18

Added:
    incubator/pig/branches/types/lib/hadoop18.jar   (with props)
Modified:
    incubator/pig/branches/types/CHANGES.txt
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
    incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java

Modified: incubator/pig/branches/types/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/CHANGES.txt?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/CHANGES.txt (original)
+++ incubator/pig/branches/types/CHANGES.txt Wed Sep 24 17:19:57 2008
@@ -247,3 +247,5 @@
 
     PIG-455: "group" alias is lost after a flatten(group) (pradeepk vi olgan)
 
+    PIG-458: integration with Hadoop 19 (olgan)
+

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed Sep 24 17:19:57 2008
@@ -27,7 +27,7 @@
     <property name="dist.dir" value="${build.dir}/${final.name}" />
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
-    <property name="hadoop.jarfile" value="hadoop17.jar" />
+    <property name="hadoop.jarfile" value="hadoop18.jar" />
 
     <!-- javac properties -->
     <property name="javac.debug" value="on" />

Added: incubator/pig/branches/types/lib/hadoop18.jar
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/lib/hadoop18.jar?rev=698783&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/branches/types/lib/hadoop18.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ComparisonFunc.java Wed Sep 24 17:19:57 2008
@@ -33,7 +33,7 @@
     protected PigProgressable reporter;
     
     public ComparisonFunc() {
-        super(NullableTuple.class);
+        super(NullableTuple.class, true);
     }
 
     public int compare(WritableComparable a, WritableComparable b) {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Sep 24 17:19:57 2008
@@ -46,7 +46,6 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.datastorage.DataStorage;
@@ -89,7 +88,6 @@
     
     protected DataStorage ds;
     
-    protected JobSubmissionProtocol jobTracker;
     protected JobClient jobClient;
 
     // key: the operator key from the logical plan that originated the physical plan
@@ -110,7 +108,6 @@
         this.ds = null;
         
         // to be set in the init method
-        this.jobTracker = null;
         this.jobClient = null;
     }
     
@@ -194,16 +191,6 @@
             
         if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
                 log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
-                if (!LOCAL.equalsIgnoreCase(cluster)) {
-                try {
-                    jobTracker = (JobSubmissionProtocol) RPC.getProxy(
-                            JobSubmissionProtocol.class,
-                            JobSubmissionProtocol.versionID, JobTracker
-                                    .getAddress(configuration), configuration);
-                } catch (IOException e) {
-                    throw new ExecException("Failed to crate job tracker", e);
-                }
-            }
         }
 
         try {

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 24 17:19:57 2008
@@ -332,6 +332,7 @@
                     jobConf.setCombinerClass(PigCombiner.Combine.class);
                     jobConf.set("pig.combinePlan", ObjectSerializer.serialize(mro.combinePlan));
                     jobConf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
+                    jobConf.setCombineOnceOnly(true);
                 } else if (mro.needsDistinctCombiner()) {
                     jobConf.setCombinerClass(DistinctCombiner.Combine.class);
                     log.info("Setting identity combiner class.");

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/Launcher.java Wed Sep 24 17:19:57 2008
@@ -8,6 +8,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
@@ -85,7 +86,7 @@
     }
     
     protected void getStats(Job job, JobClient jobClient) throws IOException{
-        String MRJobID = job.getMapredJobID();
+        JobID MRJobID = job.getAssignedJobID();
         TaskReport[] mapRep = jobClient.getMapTaskReports(MRJobID);
         getErrorMessages(mapRep, "map");
         totalHadoopTimeSpent += computeTimeSpent(mapRep);
@@ -108,7 +109,7 @@
             String msgs[] = reports[i].getDiagnostics();
             for (int j = 0; j < msgs.length; j++) {
                 log.error("Error message from task (" + type + ") " +
-                    reports[i].getTaskId() + msgs[j]);
+                    reports[i].getTaskID() + msgs[j]);
             }
         }
     }
@@ -144,7 +145,7 @@
      * @throws IOException
      */
     protected double progressOfRunningJob(Job j, JobClient jobClient) throws IOException{
-        String mrJobID = j.getMapredJobID();
+        JobID mrJobID = j.getAssignedJobID();
         RunningJob rj = jobClient.getJob(mrJobID);
         if(rj==null && j.getState()==Job.SUCCESS)
             return 1;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Wed Sep 24 17:19:57 2008
@@ -31,6 +31,7 @@
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.LoadFunc;
@@ -120,7 +121,8 @@
     }
 
     public String[] getLocations() throws IOException {
-        BlockLocation[] b = fs.getFileBlockLocations(file, start, length);
+        FileStatus status = fs.getFileStatus(file);
+        BlockLocation[] b = fs.getFileBlockLocations(status, start, length);
         int total = 0;
         for (int i = 0; i < b.length; i++) {
             total += b[i].getHosts().length;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SliceWrapper.java Wed Sep 24 17:19:57 2008
@@ -34,6 +34,7 @@
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -86,8 +87,8 @@
             Set<String> locations = new HashSet<String>();
             for (String loc : wrapped.getLocations()) {
                 Path path = new Path(loc);
-                
-                BlockLocation[] b = fs.getFileBlockLocations(path, 0, fs.getFileStatus(path).getLen());
+                FileStatus status = fs.getFileStatus(path); 
+                BlockLocation[] b = fs.getFileBlockLocations(status, 0, status.getLen());
                 int total = 0;
                 for (int i = 0; i < b.length; i++) {
                     total += b[i].getHosts().length;

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Wed Sep 24 17:19:57 2008
@@ -28,6 +28,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
@@ -169,7 +170,8 @@
             // These are hard-coded begin/end offsets a Hadoop *taskid*
             int beginIndex = 25, endIndex = 31;   
 
-            int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            //int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();
         }
         return false;
@@ -246,4 +248,4 @@
     }
 }
 
-    
\ No newline at end of file
+    

Modified: incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java?rev=698783&r1=698782&r2=698783&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/tools/grunt/GruntParser.java Wed Sep 24 17:19:57 2008
@@ -33,6 +33,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.RunningJob;
+import org.apache.hadoop.mapred.JobID;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -276,7 +277,8 @@
     protected void processKill(String jobid) throws IOException
     {
         if (mJobClient != null) {
-            RunningJob job = mJobClient.getJob(jobid);
+            JobID id = JobID.forName(jobid);
+            RunningJob job = mJobClient.getJob(id);
             if (job == null)
                 System.out.println("Job with id " + jobid + " is not active");
             else