You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/18 22:47:00 UTC

svn commit: r696795 - in /incubator/pig/trunk: ./ lib/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/datastorage/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/ src/org/apache...

Author: olga
Date: Thu Sep 18 13:46:59 2008
New Revision: 696795

URL: http://svn.apache.org/viewvc?rev=696795&view=rev
Log:
PIG-253: integration with Hadoop 18

Added:
    incubator/pig/trunk/lib/hadoop18.jar   (with props)
Modified:
    incubator/pig/trunk/build.xml
    incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
    incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java

Modified: incubator/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/build.xml (original)
+++ incubator/pig/trunk/build.xml Thu Sep 18 13:46:59 2008
@@ -58,7 +58,7 @@
     <property name="build.javadoc" value="${build.docs}/api" />
     <property name="build.encoding" value="ISO-8859-1" />
     <!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
-    <property name="hadoop.jarfile" value="hadoop17.jar" />
+    <property name="hadoop.jarfile" value="hadoop18.jar" />
 
     <!-- distribution properties -->
     <property name="staging.dir" value="${build.dir}/staging"/>
@@ -85,7 +85,7 @@
     <property name="junit.hadoop.conf" value="" />
     <property name="test.log.dir" value="${basedir}/test/logs"/>
     <property name="junit.hadoop.conf" value="${user.home}/pigtest/conf/"/>
-    <property name="test.output" value="no"/>
+    <property name="test.output" value="yes"/>
 	
 	<!-- javadoc properties  -->
 	<property name="javadoc.link.java" value="http://java.sun.com/j2se/1.5.0/docs/api/" />

Added: incubator/pig/trunk/lib/hadoop18.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop18.jar?rev=696795&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/pig/trunk/lib/hadoop18.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Modified: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Thu Sep 18 13:46:59 2008
@@ -26,7 +26,7 @@
 
 public abstract class ComparisonFunc extends WritableComparator {
     public ComparisonFunc() {
-        super(Tuple.class);
+        super(Tuple.class, true);
     }
 
     public int compare(WritableComparable a, WritableComparable b) {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Thu Sep 18 13:46:59 2008
@@ -75,6 +75,6 @@
     
     public SeekableInputStream sopen() throws IOException {
         return new HSeekableInputStream(fs.getHFS().open(path),
-                                        fs.getHFS().getContentLength(path));
+                                        fs.getHFS(). getContentSummary(path).getLength());
     }
 }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Sep 18 13:46:59 2008
@@ -45,7 +45,6 @@
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -80,7 +79,6 @@
     
     protected DataStorage ds;
     
-    protected JobSubmissionProtocol jobTracker;
     protected JobClient jobClient;
 
     // key: the operator key from the logical plan that originated the physical plan
@@ -101,7 +99,6 @@
         this.ds = null;
         
         // to be set in the init method
-        this.jobTracker = null;
         this.jobClient = null;
     }
     
@@ -185,16 +182,6 @@
             
         if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
                 log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
-                if (!LOCAL.equalsIgnoreCase(cluster)) {
-                try {
-                    jobTracker = (JobSubmissionProtocol) RPC.getProxy(
-                            JobSubmissionProtocol.class,
-                            JobSubmissionProtocol.versionID, JobTracker
-                                    .getAddress(configuration), configuration);
-                } catch (IOException e) {
-                    throw new ExecException("Failed to crate job tracker", e);
-                }
-            }
         }
 
         try {

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Sep 18 13:46:59 2008
@@ -175,6 +175,9 @@
             }
             if (pom.toCombine != null) {
                 conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+                // this is to make sure that combiner is only called once
+                // since we can't handle no combine or multiple combines
+                conf.setCombineOnceOnly(true);
             }
             if (pom.groupFuncs != null) {
                 conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Sep 18 13:46:59 2008
@@ -70,7 +70,10 @@
                 }
             }
 
-            index = PigInputFormat.getActiveSplit().getIndex();
+            if (PigInputFormat.getActiveSplit() == null) {
+            } else {
+                index = PigInputFormat.getActiveSplit().getIndex();
+            }
 
             Datum groupName = key.getField(0);
             finalout.group = key;

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Thu Sep 18 13:46:59 2008
@@ -31,6 +31,7 @@
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -90,11 +91,12 @@
         Set<String> locations = new HashSet<String>();
         for (String loc : wrapped.getLocations()) {
             Path path = new Path(loc);
-            String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
-                    path).getLen());
-            for (int i = 0; i < hints.length; i++) {
-                for (int j = 0; j < hints[i].length; j++) {
-                    locations.add(hints[i][j]);
+            BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus(
+                                            path).getLen());
+            for (int i = 0; i < blocks.length; i++) {
+                String[] hosts = blocks[i].getHosts();
+                for (int j = 0; j < hosts.length; j++){
+                    locations.add(hosts[j]);
                 }
             }
         }

Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep 18 13:46:59 2008
@@ -29,6 +29,7 @@
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
 import org.apache.pig.impl.eval.collector.DataCollector;
@@ -169,10 +170,7 @@
      */
     private boolean writeErrorToHDFS(int limit, String taskId) {
         if (command.getPersistStderr()) {
-            // These are hard-coded begin/end offsets a Hadoop *taskid*
-            int beginIndex = 25, endIndex = 31;   
-
-            int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+            int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
             return tipId < command.getLogFilesLimit();
         }
         return false;
@@ -249,4 +247,4 @@
     }
 }
 
-    
\ No newline at end of file
+