You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/09/18 22:47:00 UTC
svn commit: r696795 - in /incubator/pig/trunk: ./ lib/ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/
src/org/apache...
Author: olga
Date: Thu Sep 18 13:46:59 2008
New Revision: 696795
URL: http://svn.apache.org/viewvc?rev=696795&view=rev
Log:
PIG-253: integration with Hadoop 18
Added:
incubator/pig/trunk/lib/hadoop18.jar (with props)
Modified:
incubator/pig/trunk/build.xml
incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
Modified: incubator/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/build.xml (original)
+++ incubator/pig/trunk/build.xml Thu Sep 18 13:46:59 2008
@@ -58,7 +58,7 @@
<property name="build.javadoc" value="${build.docs}/api" />
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
- <property name="hadoop.jarfile" value="hadoop17.jar" />
+ <property name="hadoop.jarfile" value="hadoop18.jar" />
<!-- distribution properties -->
<property name="staging.dir" value="${build.dir}/staging"/>
@@ -85,7 +85,7 @@
<property name="junit.hadoop.conf" value="" />
<property name="test.log.dir" value="${basedir}/test/logs"/>
<property name="junit.hadoop.conf" value="${user.home}/pigtest/conf/"/>
- <property name="test.output" value="no"/>
+ <property name="test.output" value="yes"/>
<!-- javadoc properties -->
<property name="javadoc.link.java" value="http://java.sun.com/j2se/1.5.0/docs/api/" />
Added: incubator/pig/trunk/lib/hadoop18.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop18.jar?rev=696795&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/pig/trunk/lib/hadoop18.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Thu Sep 18 13:46:59 2008
@@ -26,7 +26,7 @@
public abstract class ComparisonFunc extends WritableComparator {
public ComparisonFunc() {
- super(Tuple.class);
+ super(Tuple.class, true);
}
public int compare(WritableComparable a, WritableComparable b) {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Thu Sep 18 13:46:59 2008
@@ -75,6 +75,6 @@
public SeekableInputStream sopen() throws IOException {
return new HSeekableInputStream(fs.getHFS().open(path),
- fs.getHFS().getContentLength(path));
+ fs.getHFS(). getContentSummary(path).getLength());
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Sep 18 13:46:59 2008
@@ -45,7 +45,6 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.executionengine.ExecException;
@@ -80,7 +79,6 @@
protected DataStorage ds;
- protected JobSubmissionProtocol jobTracker;
protected JobClient jobClient;
// key: the operator key from the logical plan that originated the physical plan
@@ -101,7 +99,6 @@
this.ds = null;
// to be set in the init method
- this.jobTracker = null;
this.jobClient = null;
}
@@ -185,16 +182,6 @@
if(cluster != null && !cluster.equalsIgnoreCase(LOCAL)){
log.info("Connecting to map-reduce job tracker at: " + properties.get(JOB_TRACKER_LOCATION));
- if (!LOCAL.equalsIgnoreCase(cluster)) {
- try {
- jobTracker = (JobSubmissionProtocol) RPC.getProxy(
- JobSubmissionProtocol.class,
- JobSubmissionProtocol.versionID, JobTracker
- .getAddress(configuration), configuration);
- } catch (IOException e) {
- throw new ExecException("Failed to crate job tracker", e);
- }
- }
}
try {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/MapReduceLauncher.java Thu Sep 18 13:46:59 2008
@@ -175,6 +175,9 @@
}
if (pom.toCombine != null) {
conf.set("pig.combineFunc", ObjectSerializer.serialize(pom.toCombine));
+ // this is to make sure that combiner is only called once
+ // since we can't handle no combine or multiple combines
+ conf.setCombineOnceOnly(true);
}
if (pom.groupFuncs != null) {
conf.set("pig.groupFuncs", ObjectSerializer.serialize(pom.groupFuncs));
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu Sep 18 13:46:59 2008
@@ -70,7 +70,10 @@
}
}
- index = PigInputFormat.getActiveSplit().getIndex();
+ if (PigInputFormat.getActiveSplit() == null) {
+ } else {
+ index = PigInputFormat.getActiveSplit().getIndex();
+ }
Datum groupName = key.getField(0);
finalout.group = key;
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SliceWrapper.java Thu Sep 18 13:46:59 2008
@@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -90,11 +91,12 @@
Set<String> locations = new HashSet<String>();
for (String loc : wrapped.getLocations()) {
Path path = new Path(loc);
- String hints[][] = fs.getFileCacheHints(path, 0, fs.getFileStatus(
- path).getLen());
- for (int i = 0; i < hints.length; i++) {
- for (int j = 0; j < hints[i].length; j++) {
- locations.add(hints[i][j]);
+ BlockLocation[] blocks = fs.getFileBlockLocations(path, 0, fs.getFileStatus(
+ path).getLen());
+ for (int i = 0; i < blocks.length; i++) {
+ String[] hosts = blocks[i].getHosts();
+ for (int j = 0; j < hosts.length; j++){
+ locations.add(hosts[j]);
}
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java?rev=696795&r1=696794&r2=696795&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/streaming/HadoopExecutableManager.java Thu Sep 18 13:46:59 2008
@@ -29,6 +29,7 @@
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskAttemptID;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapreduceExec.PigMapReduce;
import org.apache.pig.impl.eval.collector.DataCollector;
@@ -169,10 +170,7 @@
*/
private boolean writeErrorToHDFS(int limit, String taskId) {
if (command.getPersistStderr()) {
- // These are hard-coded begin/end offsets a Hadoop *taskid*
- int beginIndex = 25, endIndex = 31;
-
- int tipId = Integer.parseInt(taskId.substring(beginIndex, endIndex));
+ int tipId = TaskAttemptID.forName(taskId).getTaskID().getId();
return tipId < command.getLogFilesLimit();
}
return false;
@@ -249,4 +247,4 @@
}
}
-
\ No newline at end of file
+