You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/07/17 19:04:08 UTC
svn commit: r677637 - in /incubator/pig/branches/types: ./
src/org/apache/pig/ src/org/apache/pig/backend/datastorage/
src/org/apache/pig/backend/hadoop/datastorage/
src/org/apache/pig/backend/local/datastorage/ src/org/apache/pig/data/
src/org/apache/...
Author: olga
Date: Thu Jul 17 10:04:07 2008
New Revision: 677637
URL: http://svn.apache.org/viewvc?rev=677637&view=rev
Log:
PIG-198, and 106
Modified:
incubator/pig/branches/types/build.xml
incubator/pig/branches/types/src/org/apache/pig/ExecType.java
incubator/pig/branches/types/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
incubator/pig/branches/types/src/org/apache/pig/backend/local/datastorage/LocalPath.java
incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java
incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Thu Jul 17 10:04:07 2008
@@ -18,14 +18,6 @@
<property name="src.gen.dir" value="${basedir}/src-gen/" />
<property name="src.docs.dir" value="${basedir}/docs/" />
- <!-- javac properties -->
- <property name="javac.debug" value="on" />
- <property name="javac.optimize" value="on" />
- <property name="javac.deprecation" value="off" />
- <property name="javac.version" value="1.5" />
- <property name="javac.args" value="" />
- <!-- TODO we should use warning... <property name="javac.args.warnings" value="-Xlint:unchecked" /> -->
- <property name="javac.args.warnings" value="" />
<!-- build properties -->
<property name="build.dir" value="${basedir}/build" />
@@ -35,7 +27,18 @@
<property name="dist.dir" value="${build.dir}/${final.name}" />
<property name="build.encoding" value="ISO-8859-1" />
<!-- TODO with only one version of hadoop in the lib folder we do not need that anymore -->
- <property name="hadoop.jarfile" value="hadoop16.jar" />
+ <property name="hadoop.jarfile" value="hadoop17.jar" />
+
+ <!-- javac properties -->
+ <property name="javac.debug" value="on" />
+ <property name="javac.optimize" value="on" />
+ <property name="javac.deprecation" value="on" />
+ <property name="javac.version" value="1.5" />
+ <property name="javac.args" value="" />
+ <!-- default warnings option -->
+ <property name="javac.args.warnings" value="-Xmaxwarns 1000000" />
+ <!-- warnings option if all.warnings property is set on cmdline -->
+ <property name="javac.args.all.warnings" value="-Xmaxwarns 1000000 -Xlint" />
<!-- jar names. TODO we might want to use the svn reversion name in the name in case it is a dev version -->
<property name="output.jarfile" value="${build.dir}/${final.name}.jar" />
@@ -150,28 +153,62 @@
<!-- ================================================================== -->
<target name="compile" depends="init, cc-compile" description="Compile all artifacts">
<echo>*** Building Main Sources ***</echo>
+ <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
+ <echo>*** If all.warnings property is supplied, compile-sources-all-warnings target will be executed ***</echo>
+ <echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
+
<antcall target="compile-sources">
<param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
<param name="dist" value="${build.classes}" />
<param name="cp" value="classpath" />
</antcall>
+
+ <antcall target="compile-sources-all-warnings">
+ <param name="sources" value="${src.dir};${src.gen.dir};${src.lib.dir}/shock;${src.lib.dir}/bzip2" />
+ <param name="dist" value="${build.classes}" />
+ <param name="cp" value="classpath" />
+ </antcall>
+
</target>
<target name="compile-test" depends="compile">
<echo>*** Building Test Sources ***</echo>
+ <echo>*** To compile with all warnings enabled, supply -Dall.warnings=1 on command line ***</echo>
+ <echo>*** If all.warnings property is supplied, compile-sources-all-warnings target will be executed ***</echo>
+ <echo>*** Else, compile-sources (which only warns about deprecations) target will be executed ***</echo>
+
<antcall target="compile-sources">
<param name="sources" value="${test.src.dir}" />
<param name="dist" value="${test.build.classes}" />
<param name="cp" value="test.classpath" />
</antcall>
+
+ <antcall target="compile-sources-all-warnings">
+ <param name="sources" value="${test.src.dir}" />
+ <param name="dist" value="${test.build.classes}" />
+ <param name="cp" value="test.classpath" />
+ </antcall>
+
</target>
- <target name="compile-sources">
+ <!-- This target is for default compilation -->
+ <target name="compile-sources" unless="all.warnings">
<javac encoding="${build.encoding}" srcdir="${sources}"
includes="**/*.java" destdir="${dist}" debug="${javac.debug}"
optimize="${javac.optimize}" target="${javac.version}"
source="${javac.version}" deprecation="${javac.deprecation}">
- <compilerarg line="${javac.args} ${javac.args.warnings}" />
+ <compilerarg line="${javac.args} ${javac.args.warnings}"/>
+ <classpath refid="${cp}" />
+ </javac>
+ </target>
+
+ <!-- this target is for compilation with all warnings enabled -->
+ <target name="compile-sources-all-warnings" if="all.warnings">
+ <javac encoding="${build.encoding}" srcdir="${sources}"
+ includes="**/*.java" destdir="${dist}" debug="${javac.debug}"
+ optimize="${javac.optimize}" target="${javac.version}"
+ source="${javac.version}" deprecation="${javac.deprecation}">
+ <compilerarg line="${javac.args} ${javac.args.all.warnings} "/>
<classpath refid="${cp}" />
</javac>
</target>
Modified: incubator/pig/branches/types/src/org/apache/pig/ExecType.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/ExecType.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/ExecType.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/ExecType.java Thu Jul 17 10:04:07 2008
@@ -15,6 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.pig;
/**
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/datastorage/ElementDescriptor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/datastorage/ElementDescriptor.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/datastorage/ElementDescriptor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/datastorage/ElementDescriptor.java Thu Jul 17 10:04:07 2008
@@ -140,6 +140,13 @@
*/
public void updateConfiguration(Properties newConfig)
throws IOException;
+
+ /**
+ * Defines whether the element is visible to users or
+ * contains system's metadata
+ * @return true if this is system file; false otherwise
+ */
+ public boolean systemElement();
/**
* List entity statistics
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HDataStorage.java Thu Jul 17 10:04:07 2008
@@ -20,11 +20,14 @@
import java.net.URI;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Properties;
import java.util.Enumeration;
import java.util.Map;
import java.util.HashMap;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.dfs.DistributedFileSystem;
@@ -194,19 +197,24 @@
public HPath[] asCollection(String pattern) throws DataStorageException {
try {
- Path[] paths = this.fs.globPaths(new Path(pattern));
-
- HPath[] hpaths = new HPath[ paths.length ];
+ FileStatus[] paths = this.fs.globStatus(new Path(pattern));
+
+ if (paths == null)
+ return new HPath[0];
+
+ List<HPath> hpaths = new ArrayList<HPath>();
for (int i = 0; i < paths.length; ++i) {
- hpaths[ i ] = ((HPath)this.asElement(paths[ i ].toString()));
+ HPath hpath = (HPath)this.asElement(paths[i].getPath().toString());
+ if (!hpath.systemElement()) {
+ hpaths.add(hpath);
+ }
}
-
-
- return hpaths;
- }
- catch (IOException e) {
- throw new DataStorageException("Failed to obtain glob for " + pattern, e);
+
+ return hpaths.toArray(new HPath[hpaths.size()]);
+ } catch (IOException e) {
+ throw new DataStorageException("Failed to obtain glob for "
+ + pattern, e);
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HFile.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HFile.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HFile.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HFile.java Thu Jul 17 10:04:07 2008
@@ -75,6 +75,6 @@
public SeekableInputStream sopen() throws IOException {
return new HSeekableInputStream(fs.getHFS().open(path),
- fs.getHFS().getContentLength(path));
+ fs.getHFS().getContentSummary(path).getLength());
}
}
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Thu Jul 17 10:04:07 2008
@@ -110,7 +110,7 @@
public void delete() throws IOException {
// the file is removed and not placed in the trash bin
- fs.getHFS().delete(path);
+ fs.getHFS().delete(path, true);
}
public Properties getConfiguration() throws IOException {
@@ -168,7 +168,13 @@
public FileSystem getHFS() {
return fs.getHFS();
}
-
+
+ public boolean systemElement() {
+ return (path != null &&
+ (path.getName().startsWith("_") ||
+ path.getName().startsWith(".")));
+ }
+
@Override
public String toString() {
return path.toString();
Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/datastorage/LocalPath.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/datastorage/LocalPath.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/datastorage/LocalPath.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/datastorage/LocalPath.java Thu Jul 17 10:04:07 2008
@@ -152,6 +152,10 @@
public int compareTo(ElementDescriptor other) {
return this.path.compareTo(((LocalPath)other).path);
}
+
+ public boolean systemElement(){
+ return false;
+ }
public String toString() {
return this.path.toString();
Modified: incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/data/IndexedTuple.java Thu Jul 17 10:04:07 2008
@@ -45,7 +45,12 @@
@Override
public String toString() {
- return super.toString() + "[" + index + "]";
+ StringBuilder sb = new StringBuilder(super.toString());
+ sb.append("[");
+ sb.append(index);
+ sb.append("]");
+
+ return sb.toString();
}
// Writable methods:
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/io/FileLocalizer.java Thu Jul 17 10:04:07 2008
@@ -168,6 +168,8 @@
if (elem.exists()) {
try {
if(! elem.getDataStorage().isContainer(elem.toString())) {
+ if (elem.systemElement())
+ throw new IOException ("Attempt is made to open system file " + elem.toString());
return elem.open();
}
}
@@ -181,7 +183,10 @@
((ContainerDescriptor)elem).iterator();
while (allElements.hasNext()) {
- arrayList.add(allElements.next());
+ ElementDescriptor nextElement = allElements.next();
+ if (!nextElement.systemElement()) {
+ arrayList.add(nextElement);
+ }
}
elements = new ElementDescriptor[ arrayList.size() ];
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/JobControlCompiler.java Thu Jul 17 10:04:07 2008
@@ -39,6 +39,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
@@ -136,7 +137,7 @@
}
//Has dependencies. So compile all the inputs
- List compiledInputs = new ArrayList(pred.size());
+ List<Job> compiledInputs = new ArrayList<Job>(pred.size());
for (MapReduceOper oper : pred) {
Job ret = null;
@@ -154,7 +155,7 @@
//Create a new Job with the obtained JobConf
//and the compiled inputs as dependent jobs
- return new Job(currJC,(ArrayList)compiledInputs);
+ return new Job(currJC,(ArrayList<Job>)compiledInputs);
}catch(Exception e){
JobCreationException jce = new JobCreationException(e);
throw jce;
@@ -242,7 +243,7 @@
//set out filespecs
String outputPath = st.getSFile().getFileName();
FuncSpec outputFuncSpec = st.getSFile().getFuncSpec();
- jobConf.setOutputPath(new Path(outputPath));
+ FileOutputFormat.setOutputPath(jobConf, new Path(outputPath));
jobConf.set("pig.storeFunc", outputFuncSpec.toString());
if(mro.reducePlan.isEmpty()){
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigInputFormat.java Thu Jul 17 10:04:07 2008
@@ -32,6 +32,7 @@
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
@@ -92,17 +93,17 @@
* if zero items.
*/
protected Path[] listPaths(JobConf job) throws IOException {
- Path[] dirs = job.getInputPaths();
+ Path[] dirs = FileInputFormat.getInputPaths(job);
if (dirs.length == 0) {
throw new IOException("No input paths specified in job");
}
+
List<Path> result = new ArrayList<Path>();
for (Path p : dirs) {
FileSystem fs = p.getFileSystem(job);
- Path[] matches = fs.listPaths(fs.globPaths(p, hiddenFileFilter),
- hiddenFileFilter);
- for (Path match : matches) {
- result.add(fs.makeQualified(match));
+ FileStatus[] matches = fs.globStatus(p, hiddenFileFilter);
+ for (FileStatus match : matches) {
+ result.add(fs.makeQualified(match.getPath()));
}
}
@@ -187,14 +188,20 @@
ArrayList<Path> paths = new ArrayList<Path>();
// If you give a non-glob name, globPaths returns a single
// element with just that name.
- Path[] globPaths = fs.globPaths(path);
+
+ FileStatus[] matches = fs.globStatus(path, hiddenFileFilter);
+ List<Path> matchList = new ArrayList<Path>();
+ for (FileStatus match : matches) {
+ matchList.add(match.getPath());
+ }
+ Path[] globPaths = matchList.toArray(new Path[matchList.size()]);
for (int m = 0; m < globPaths.length; m++)
paths.add(globPaths[m]);
// paths.add(path);
for (int j = 0; j < paths.size(); j++) {
Path fullPath = new Path(fs.getWorkingDirectory(), paths.get(j));
if (fs.getFileStatus(fullPath).isDir()) {
- FileStatus children[] = fs.listStatus(fullPath);
+ FileStatus children[] = fs.listStatus(fullPath, hiddenFileFilter);
for (int k = 0; k < children.length; k++) {
paths.add(children[k].getPath());
}
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigOutputFormat.java Thu Jul 17 10:04:07 2008
@@ -24,6 +24,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputFormat;
import org.apache.hadoop.mapred.RecordWriter;
@@ -41,12 +42,12 @@
* image of PigInputFormat having RecordWriter instead
* of a RecordReader.
*/
-public class PigOutputFormat implements OutputFormat {
+public class PigOutputFormat implements OutputFormat<WritableComparable, Tuple> {
public static final String PIG_OUTPUT_FUNC = "pig.output.func";
- public RecordWriter getRecordWriter(FileSystem fs, JobConf job,
+ public RecordWriter<WritableComparable, Tuple> getRecordWriter(FileSystem fs, JobConf job,
String name, Progressable progress) throws IOException {
- Path outputDir = job.getOutputPath();
+ Path outputDir = FileOutputFormat.getWorkOutputPath(job);
return getRecordWriter(fs, job, outputDir, name, progress);
}
@@ -69,7 +70,8 @@
throw re;
}
}
- String parentName = outputDir.getParent().getName();
+
+ String parentName = FileOutputFormat.getOutputPath(job).getName();
int suffixStart = parentName.lastIndexOf('.');
if (suffixStart != -1) {
String suffix = parentName.substring(suffixStart);
@@ -85,7 +87,8 @@
return;
}
- static public class PigRecordWriter implements RecordWriter {
+ static public class PigRecordWriter implements
+ RecordWriter<WritableComparable, Tuple> {
private OutputStream os = null;
private StoreFunc sfunc = null;
@@ -93,7 +96,7 @@
public PigRecordWriter(FileSystem fs, Path file, StoreFunc sfunc)
throws IOException {
this.sfunc = sfunc;
- fs.delete(file);
+ fs.delete(file, true);
this.os = fs.create(file);
String name = file.getName();
if (name.endsWith(".bz") || name.endsWith(".bz2")) {
@@ -109,9 +112,9 @@
* @see org.apache.hadoop.mapred.RecordWriter#write(org.apache.hadoop.io.WritableComparable,
* org.apache.hadoop.io.Writable)
*/
- public void write(WritableComparable key, Writable value)
+ public void write(WritableComparable key, Tuple value)
throws IOException {
- this.sfunc.putNext((Tuple) value);
+ this.sfunc.putNext(value);
}
public void close(Reporter reporter) throws IOException {
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/PigSplit.java Thu Jul 17 10:04:07 2008
@@ -28,6 +28,7 @@
import java.util.ArrayList;
import java.util.List;
+import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.InputSplit;
@@ -119,16 +120,17 @@
}
public String[] getLocations() throws IOException {
- String hints[][] = fs.getFileCacheHints(file, start, length);
+ BlockLocation[] b = fs.getFileBlockLocations(file, start, length);
int total = 0;
- for (int i = 0; i < hints.length; i++) {
- total += hints[i].length;
+ for (int i = 0; i < b.length; i++) {
+ total += b[i].getHosts().length;
}
String locations[] = new String[total];
int count = 0;
- for (int i = 0; i < hints.length; i++) {
- for (int j = 0; j < hints[i].length; j++) {
- locations[count++] = hints[i][j];
+ for (int i = 0; i < b.length; i++) {
+ String hosts[] = b[i].getHosts();
+ for (int j = 0; j < hosts.length; j++) {
+ locations[count++] = hosts[j];
}
}
return locations;
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/mapReduceLayer/SortPartitioner.java Thu Jul 17 10:04:07 2008
@@ -23,8 +23,8 @@
import java.util.Arrays;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Partitioner;
import org.apache.pig.backend.executionengine.ExecException;
@@ -34,10 +34,9 @@
import org.apache.pig.impl.io.BufferedPositionedInputStream;
import org.apache.pig.impl.io.FileLocalizer;
-
-public class SortPartitioner implements Partitioner {
+public class SortPartitioner implements Partitioner<WritableComparable, Writable> {
Tuple[] quantiles;
- WritableComparator comparator;
+ RawComparator<WritableComparable> comparator;
public int getPartition(WritableComparable key, Writable value,
int numPartitions){
Modified: incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/MiniCluster.java Thu Jul 17 10:04:07 2008
@@ -54,7 +54,7 @@
// Builds and starts the mini dfs and mapreduce clusters
m_dfs = new MiniDFSCluster(config, dataNodes, true, null);
m_fileSys = m_dfs.getFileSystem();
- m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getName(), 1);
+ m_mr = new MiniMRCluster(taskTrackers, m_fileSys.getUri().toString(), 1);
// Create the configuration hadoop-site.xml file
File conf_dir = new File(System.getProperty("user.home"), "pigtest/conf/");
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java?rev=677637&r1=677636&r2=677637&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestCombiner.java Thu Jul 17 10:04:07 2008
@@ -34,20 +34,24 @@
public class TestCombiner extends TestCase {
- @Test
- public void testLocal() throws Exception {
- // run the test locally
- runTest(new PigServer("local"));
- }
+
+ MiniCluster cluster = MiniCluster.buildCluster();
+
@Test
public void testOnCluster() throws Exception {
- // run the test on cluster
- MiniCluster.buildCluster();
+ // run the test on cluster
runTest(new PigServer("mapreduce"));
}
+ @Test
+ public void testLocal() throws Exception {
+ // run the test locally
+ runTest(new PigServer("local"));
+ }
+
+
private void runTest(PigServer pig) throws IOException {
List<String> inputLines = new ArrayList<String>();
inputLines.add("a,b,1");