You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/11/01 17:52:41 UTC
svn commit: r1029741 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/LoadFunc.java test/org/apache/pig/test/TestBZip.java
test/org/apache/pig/test/TestLoad.java
test/org/apache/pig/test/TestStore.java
Author: yanz
Date: Mon Nov 1 16:52:41 2010
New Revision: 1029741
URL: http://svn.apache.org/viewvc?rev=1029741&view=rev
Log:
PIG-1704: Output Compression is not at work if the output path is absolute and there is a trailing / afte the compression suffix (yanz)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/LoadFunc.java
pig/trunk/test/org/apache/pig/test/TestBZip.java
pig/trunk/test/org/apache/pig/test/TestLoad.java
pig/trunk/test/org/apache/pig/test/TestStore.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1029741&r1=1029740&r2=1029741&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov 1 16:52:41 2010
@@ -217,6 +217,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1704: Output Compression is not at work if the output path is absolute and there is a trailing / afte the compression suffix (yanz)
+
PIG-1695: MergeForEach does not carry user defined schema if any one of the merged ForEach has user defined schema (daijy)
PIG-1684: Inconsistent usage of store func. (thejas)
Modified: pig/trunk/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/LoadFunc.java?rev=1029741&r1=1029740&r2=1029741&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/LoadFunc.java (original)
+++ pig/trunk/src/org/apache/pig/LoadFunc.java Mon Nov 1 16:52:41 2010
@@ -270,6 +270,8 @@ public abstract class LoadFunc {
new Path(curDir, path).toString();
}
fname = fname.replaceFirst("^file:/([^/])", "file:///$1");
+ // remove the trailing /
+ fname = fname.replaceFirst("/$", "");
pathStrings.add(fname);
}
Modified: pig/trunk/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestBZip.java?rev=1029741&r1=1029740&r2=1029741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestBZip.java Mon Nov 1 16:52:41 2010
@@ -117,6 +117,65 @@ public class TestBZip {
out.delete();
}
+ /**
+ * Tests the end-to-end writing and reading of a BZip file using absolute path with a trailing /.
+ */
+ @Test
+ public void testBzipInPig2() throws Exception {
+ PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+ File in = File.createTempFile("junit", ".bz2");
+ in.deleteOnExit();
+
+ File out = File.createTempFile("junit", ".bz2");
+ out.deleteOnExit();
+ out.delete();
+
+ CBZip2OutputStream cos =
+ new CBZip2OutputStream(new FileOutputStream(in));
+ for (int i = 1; i < 100; i++) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(i).append("\n").append(-i).append("\n");
+ byte bytes[] = sb.toString().getBytes();
+ cos.write(bytes);
+ }
+ cos.close();
+
+ pig.registerQuery("AA = load '"
+ + Util.generateURI(in.getAbsolutePath(), pig.getPigContext())
+ + "';");
+ pig.registerQuery("A = foreach (group (filter AA by $0 > 0) all) generate flatten($1);");
+ pig.registerQuery("store A into '" + out.getAbsolutePath() + "/';");
+ FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+ pig.getPigContext().getProperties()));
+ FSDataInputStream is = fs.open(new Path(out.getAbsolutePath() +
+ "/part-r-00000.bz2"));
+ CBZip2InputStream cis = new CBZip2InputStream(is);
+
+ // Just a sanity check, to make sure it was a bzip file; we
+ // will do the value verification later
+ assertEquals(100, cis.read(new byte[100]));
+ cis.close();
+
+ pig.registerQuery("B = load '" + out.getAbsolutePath() + "';");
+
+ Iterator<Tuple> i = pig.openIterator("B");
+ HashMap<Integer, Integer> map = new HashMap<Integer, Integer>();
+ while (i.hasNext()) {
+ Integer val = DataType.toInteger(i.next().get(0));
+ map.put(val, val);
+ }
+
+ assertEquals(new Integer(99), new Integer(map.keySet().size()));
+
+ for (int j = 1; j < 100; j++) {
+ assertEquals(new Integer(j), map.get(j));
+ }
+
+ in.delete();
+ out.delete();
+ }
+
/**
* Tests that '\n', '\r' and '\r\n' are treated as record delims when using
* bzip just like they are when using uncompressed text
Modified: pig/trunk/test/org/apache/pig/test/TestLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLoad.java?rev=1029741&r1=1029740&r2=1029741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLoad.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestLoad.java Mon Nov 1 16:52:41 2010
@@ -176,7 +176,7 @@ public class TestLoad extends junit.fram
for (PigServer pig : servers) {
pc = pig.getPigContext();
boolean noConversionExpected = true;
- checkLoadPath("/tmp/foo/../././","/tmp/foo/../././", noConversionExpected);
+ checkLoadPath("/tmp/foo/../././","/tmp/foo/.././.", noConversionExpected);
}
}
@@ -266,6 +266,7 @@ public class TestLoad extends junit.fram
String nonDfsUrl = "har://hdfs-namenode/user/foo/";
LogicalPlan lp = lpt.buildPlan("a = load '" + nonDfsUrl + "';");
LOLoad load = (LOLoad) lp.getRoots().get(0);
+ nonDfsUrl = nonDfsUrl.replaceFirst("/$", "");
Assert.assertEquals(nonDfsUrl, load.getInputFile().getFileName());
}
Modified: pig/trunk/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestStore.java?rev=1029741&r1=1029740&r2=1029741&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestStore.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestStore.java Mon Nov 1 16:52:41 2010
@@ -390,7 +390,7 @@ public class TestStore extends junit.fra
@Test
public void testStoreRemoteNormalize() throws Exception {
- checkStorePath("/tmp/foo/../././","/tmp/foo/../././");
+ checkStorePath("/tmp/foo/../././","/tmp/foo/.././.");
}
@Test