You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/22 21:36:31 UTC
svn commit: r659223 - in /incubator/pig/trunk: ./ lib/
src/org/apache/pig/backend/datastorage/
src/org/apache/pig/backend/executionengine/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/...
Author: olga
Date: Thu May 22 12:36:29 2008
New Revision: 659223
URL: http://svn.apache.org/viewvc?rev=659223&view=rev
Log:
PIG-198: integration with hadoop 0.17
Added:
incubator/pig/trunk/lib/hadoop17.jar (with props)
Removed:
incubator/pig/trunk/lib/hadoop15.jar
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/build.xml
incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java
incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java
incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu May 22 12:36:29 2008
@@ -295,3 +295,5 @@
PIG-237: validation of the output directory (pi_song via olgan)
PIG-236: Fix properties so that values specified via the command line (-D) are not ignored (pkamath via gates).
+
+ PIG-198: integration with hadoop 17
Modified: incubator/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/build.xml?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/build.xml (original)
+++ incubator/pig/trunk/build.xml Thu May 22 12:36:29 2008
@@ -42,7 +42,7 @@
<property name="dist.dir" value="${build.dir}/${final.name}" />
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
- <property name="hadoop.jarfile" value="hadoop16.jar" />
+ <property name="hadoop.jarfile" value="hadoop17.jar" />
<!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
<property name="output.jarfile" value="${build.dir}/${final.name}.jar" />
Added: incubator/pig/trunk/lib/hadoop17.jar
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/lib/hadoop17.jar?rev=659223&view=auto
==============================================================================
Binary file - no diff available.
Propchange: incubator/pig/trunk/lib/hadoop17.jar
------------------------------------------------------------------------------
svn:mime-type = application/octet-stream
Modified: incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/datastorage/ElementDescriptor.java Thu May 22 12:36:29 2008
@@ -130,7 +130,14 @@
*/
public void updateConfiguration(Properties newConfig)
throws IOException;
-
+
+ /**
+ * Defines whether the element is visible to users or
+ * contains system's metadata
+ * @return true if this is system file; false otherwise
+ */
+ public boolean systemElement();
+
/**
* List entity statistics
* @return DataStorageProperties
Modified: incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/executionengine/PigSlicer.java Thu May 22 12:36:29 2008
@@ -66,6 +66,10 @@
for (int j = 0; j < paths.size(); j++) {
ElementDescriptor fullPath = store.asElement(store
.getActiveContainer(), paths.get(j));
+ // Skip hadoop's private/meta files ...
+ if (fullPath.systemElement()) {
+ continue;
+ }
if (fullPath instanceof ContainerDescriptor) {
for (ElementDescriptor child : ((ContainerDescriptor) fullPath)) {
paths.add(child);
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Thu May 22 12:36:29 2008
@@ -2,13 +2,16 @@
import java.io.IOException;
import java.net.URI;
+import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.dfs.DistributedFileSystem;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -183,15 +186,21 @@
public HPath[] asCollection(String pattern) throws DataStorageException {
try {
- Path[] paths = this.fs.globPaths(new Path(pattern));
-
- HPath[] hpaths = new HPath[paths.length];
-
- for (int i = 0; i < paths.length; ++i) {
- hpaths[i] = ((HPath) this.asElement(paths[i].toString()));
- }
-
- return hpaths;
+ FileStatus[] paths = this.fs.globStatus(new Path(pattern));
+
+ if (paths == null)
+ return new HPath[0];
+
+ List<HPath> hpaths = new ArrayList<HPath>();
+
+ for (int i = 0; i < paths.length; ++i) {
+ HPath hpath = (HPath)this.asElement(paths[i].getPath().toString());
+ if (!hpath.systemElement()) {
+ hpaths.add(hpath);
+ }
+ }
+
+ return hpaths.toArray(new HPath[hpaths.size()]);
} catch (IOException e) {
throw new DataStorageException("Failed to obtain glob for "
+ pattern, e);
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu May 22 12:36:29 2008
@@ -151,6 +151,12 @@
public FileSystem getHFS() {
return fs.getHFS();
}
+
+ public boolean systemElement() {
+ return (path != null &&
+ (path.getName().startsWith("_") ||
+ path.getName().startsWith(".")));
+ }
@Override
public String toString() {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu May 22 12:36:29 2008
@@ -11,6 +11,8 @@
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketImplFactory;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Enumeration;
@@ -576,16 +578,33 @@
}
private String fixUpDomain(String hostPort,Properties properties) throws UnknownHostException {
- String parts[] = hostPort.split(":");
- if (parts[0].indexOf('.') == -1) {
+ URI uri = null;
+ try {
+ uri = new URI(hostPort);
+ } catch (URISyntaxException use) {
+ throw new RuntimeException("Illegal hostPort: " + hostPort);
+ }
+
+ String hostname = uri.getHost();
+ int port = uri.getPort();
+
+ // Parse manually if hostPort wasn't non-opaque URI
+ // e.g. hostPort is "myhost:myport"
+ if (hostname == null || port == -1) {
+ String parts[] = hostPort.split(":");
+ hostname = parts[0];
+ port = Integer.valueOf(parts[1]);
+ }
+
+ if (hostname.indexOf('.') == -1) {
//jz: fallback to systemproperty cause this not handled in Main
String domain = properties.getProperty("cluster.domain",System.getProperty("cluster.domain"));
if (domain == null)
throw new RuntimeException("Missing cluster.domain property!");
- parts[0] = parts[0] + "." + domain;
+ hostname = hostname + "." + domain;
}
- InetAddress.getByName(parts[0]);
- return parts[0] + ":" + parts[1];
+ InetAddress.getByName(hostname);
+ return hostname + ":" + Integer.toString(port);
}
// create temp dir to store hod output; removed on exit
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigCombine.java Thu May 22 12:36:29 2008
@@ -40,7 +40,8 @@
import org.apache.pig.impl.util.ObjectSerializer;
-public class PigCombine implements Reducer {
+public class PigCombine implements
+ Reducer<Tuple, IndexedTuple, Tuple, IndexedTuple> {
private final Log log = LogFactory.getLog(getClass());
@@ -53,7 +54,8 @@
private PigContext pigContext;
private EvalSpec esp;
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ public void reduce(Tuple key, Iterator<IndexedTuple> values,
+ OutputCollector<Tuple, IndexedTuple> output, Reporter reporter)
throws IOException {
try {
@@ -70,8 +72,8 @@
index = PigInputFormat.getActiveSplit().getIndex();
- Datum groupName = ((Tuple) key).getField(0);
- finalout.group = ((Tuple) key);
+ Datum groupName = key.getField(0);
+ finalout.group = key;
finalout.index = index;
Tuple t = new Tuple(1 + inputCount);
@@ -82,7 +84,7 @@
}
while (values.hasNext()) {
- IndexedTuple it = (IndexedTuple) values.next();
+ IndexedTuple it = values.next();
t.getBagField(it.index + 1).add(it.toTuple());
}
for (int i = 0; i < inputCount; i++) { // XXX: shouldn't we only do this if INNER flag is set?
@@ -124,8 +126,9 @@
public void close() throws IOException {
}
- private static class CombineDataOutputCollector extends DataCollector {
- OutputCollector oc = null;
+ private static class CombineDataOutputCollector
+ extends DataCollector {
+ OutputCollector<Tuple, IndexedTuple> oc = null;
Tuple group = null;
int index = -1;
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigMapReduce.java Thu May 22 12:36:29 2008
@@ -77,7 +77,8 @@
*
* @author breed
*/
-public class PigMapReduce implements MapRunnable, Reducer {
+public class PigMapReduce implements MapRunnable<WritableComparable, Tuple, WritableComparable, Writable>,
+ Reducer<Tuple, IndexedTuple, WritableComparable, Writable> {
private final Log log = LogFactory.getLog(getClass());
@@ -100,7 +101,8 @@
* the tuples from our PigRecordReader (see ugly ThreadLocal hack), pipe the tuples through the
* function pipeline and then close the writer.
*/
- public void run(RecordReader input, OutputCollector output, Reporter reporter) throws IOException {
+public void run(RecordReader<WritableComparable, Tuple> input,
+ OutputCollector<WritableComparable, Writable> output, Reporter reporter) throws IOException {
PigMapReduce.reporter = reporter;
oc = output;
@@ -110,9 +112,9 @@
// allocate key & value instances that are re-used for all entries
WritableComparable key = input.createKey();
- Writable value = input.createValue();
+ Tuple value = input.createValue();
while (input.next(key, value)) {
- evalPipe.add((Tuple) value);
+ evalPipe.add(value);
}
} finally {
try {
@@ -130,7 +132,7 @@
}
}
- public void reduce(WritableComparable key, Iterator values, OutputCollector output, Reporter reporter)
+ public void reduce(Tuple key, Iterator<IndexedTuple> values, OutputCollector<WritableComparable, Writable> output, Reporter reporter)
throws IOException {
PigMapReduce.reporter = reporter;
@@ -142,7 +144,7 @@
}
DataBag[] bags = new DataBag[inputCount];
- Datum groupName = ((Tuple) key).getField(0);
+ Datum groupName = key.getField(0);
Tuple t = new Tuple(1 + inputCount);
t.setField(0, groupName);
for (int i = 1; i < 1 + inputCount; i++) {
@@ -151,7 +153,7 @@
}
while (values.hasNext()) {
- IndexedTuple it = (IndexedTuple) values.next();
+ IndexedTuple it = values.next();
t.getBagField(it.index + 1).add(it.toTuple());
}
@@ -254,8 +256,8 @@
if (splitSpec == null){
pigWriter = (PigRecordWriter) job.getOutputFormat().getRecordWriter(FileSystem.get(job), job, fileName,
reporter);
- oc = new OutputCollector() {
- public void collect(WritableComparable key, Writable value) throws IOException {
+ oc = new OutputCollector<WritableComparable, Tuple>() {
+ public void collect(WritableComparable key, Tuple value) throws IOException {
pigWriter.write(key, value);
}
};
@@ -280,15 +282,14 @@
for (String name: splitSpec.tempFiles){
sideFileWriters.add( outputFormat.getRecordWriter(FileSystem.get(job), job, new Path(name), "split-" + getTaskId(), reporter));
}
- return new OutputCollector(){
- public void collect(WritableComparable key, Writable value) throws IOException {
- Tuple t = (Tuple) value;
+ return new OutputCollector<WritableComparable, Tuple>(){
+ public void collect(WritableComparable key, Tuple value) throws IOException {
ArrayList<Cond> conditions = splitSpec.conditions;
for (int i=0; i< conditions.size(); i++){
Cond cond = conditions.get(i);
- if (cond.eval(t)){
+ if (cond.eval(value)){
//System.out.println("Writing " + t + " to condition " + cond);
- sideFileWriters.get(i).write(null, t);
+ sideFileWriters.get(i).write(null, value);
}
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/PigOutputFormat.java Thu May 22 12:36:29 2008
@@ -25,6 +25,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
@@ -43,7 +44,7 @@
public RecordWriter getRecordWriter(FileSystem fs, JobConf job, String name, Progressable progress)
throws IOException {
- Path outputDir = job.getOutputPath();
+ Path outputDir = FileOutputFormat.getOutputPath(job);
return getRecordWriter(fs, job, outputDir, name, progress);
}
@@ -56,12 +57,8 @@
} else {
store = (StoreFunc) PigContext.instantiateFuncFromSpec(storeFunc);
}
- // The outputDir is going to be a temporary name we need to look at the
- // real thing!
- // XXX This is a wretched implementation dependent hack! Need to find
- // out from the Hadoop guys if there is a better way.
- String parentName = job.getOutputPath().getParent().getParent()
- .getName();
+
+ String parentName = FileOutputFormat.getOutputPath(job).getName();
int suffixStart = parentName.lastIndexOf('.');
if (suffixStart != -1) {
String suffix = parentName.substring(suffixStart);
@@ -77,7 +74,7 @@
return;
}
- static public class PigRecordWriter implements RecordWriter {
+ static public class PigRecordWriter implements RecordWriter<WritableComparable, Tuple> {
private OutputStream os = null;
private StoreFunc sfunc = null;
@@ -98,8 +95,8 @@
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable,
* org.apache.hadoop.io.Writable)
*/
- public void write(WritableComparable key, Writable value) throws IOException {
- this.sfunc.putNext((Tuple) value);
+ public void write(WritableComparable key, Tuple value) throws IOException {
+ this.sfunc.putNext(value);
}
public void close(Reporter reporter) throws IOException {
Modified: incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapreduceExec/SortPartitioner.java Thu May 22 12:36:29 2008
@@ -23,8 +23,7 @@
import java.util.Arrays;
import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -33,17 +32,15 @@
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
-
-public class SortPartitioner implements Partitioner {
+public class SortPartitioner implements Partitioner<Tuple, Writable> {
Tuple[] quantiles;
- WritableComparator comparator;
-
- public int getPartition(WritableComparable key, Writable value,
- int numPartitions) {
- Tuple keyTuple = (Tuple)key;
- int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0), comparator);
- if (index < 0)
- index = -index-1;
+ RawComparator comparator;
+
+ public int getPartition(Tuple key, Writable value,
+ int numPartitions) {
+ int index = Arrays.binarySearch(quantiles, key.getTupleField(0), comparator);
+ if (index < 0)
+ index = -index-1;
return Math.min(index, numPartitions - 1);
}
Modified: incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/backend/local/datastorage/LocalPath.java Thu May 22 12:36:29 2008
@@ -136,6 +136,10 @@
public int compareTo(ElementDescriptor other) {
return this.path.compareTo(((LocalPath)other).path);
}
+
+ public boolean systemElement(){
+ return false;
+ }
public String toString() {
return this.path.toString();
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=659223&r1=659222&r2=659223&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu May 22 12:36:29 2008
@@ -166,6 +166,8 @@
if (elem.exists()) {
try {
if(! elem.getDataStorage().isContainer(elem.toString())) {
+ if (elem.systemElement())
+ throw new IOException ("Attempt is made to open system file " + elem.toString());
return elem.open();
}
}
@@ -183,7 +185,9 @@
((ContainerDescriptor)elem).iterator();
while (allElements.hasNext()) {
- arrayList.add(allElements.next());
+ ElementDescriptor nextElement = allElements.next();
+ if (!nextElement.systemElement())
+ arrayList.add(nextElement);
}
elements = new ElementDescriptor[ arrayList.size() ];