You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/09 04:40:11 UTC
svn commit: r654669 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/impl/io/FileLocalizer.java
src/org/apache/pig/impl/streaming/StreamingCommand.java
test/org/apache/pig/test/TestStreaming.java
Author: olga
Date: Thu May 8 19:40:10 2008
New Revision: 654669
URL: http://svn.apache.org/viewvc?rev=654669&view=rev
Log:
PIG-232: let valid cache statements in
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu May 8 19:40:10 2008
@@ -289,3 +289,5 @@
PIG-232: fix for number of input records when BinaryStirage is used
(acmurthy via olgan)
+
+ PIG-232: let valid cache specifications through (acmurthy via olgan)
Modified: incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Thu May 8 19:40:10 2008
@@ -344,6 +344,27 @@
return elem.exists() || globMatchesFiles(elem, store);
}
+ public static boolean isFile(String filename, PigContext context)
+ throws IOException {
+ return !isDirectory(filename, context.getDfs());
+ }
+
+ public static boolean isFile(String filename, DataStorage store)
+ throws IOException {
+ return !isDirectory(filename, store);
+ }
+
+ public static boolean isDirectory(String filename, PigContext context)
+ throws IOException {
+ return isDirectory(filename, context.getDfs());
+ }
+
+ public static boolean isDirectory(String filename, DataStorage store)
+ throws IOException {
+ ElementDescriptor elem = store.asElement(filename);
+ return (elem instanceof ContainerDescriptor);
+ }
+
private static boolean globMatchesFiles(ElementDescriptor elem,
DataStorage fs)
throws IOException
Modified: incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/streaming/StreamingCommand.java Thu May 8 19:40:10 2008
@@ -3,6 +3,8 @@
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -142,7 +144,11 @@
// Validate
File file = new File(path);
if (!file.exists()) {
- throw new IOException("Invalid ship specification: " + path);
+ throw new IOException("Invalid ship specification: '" + path +
+ "' does not exist!");
+ } else if (file.isDirectory()) {
+ throw new IOException("Invalid ship specification: '" + path +
+ "' is a directory and can't be shipped!");
}
shipSpec.add(path);
@@ -156,10 +162,35 @@
*/
public void addPathToCache(String path) throws IOException {
// Validate
- if (!FileLocalizer.fileExists(path, pigContext)) {
+ URI pathUri = null;
+ URI dfsPath = null;
+ try {
+ pathUri = new URI(path);
+
+ // Strip away the URI's _fragment_ and _query_
+ dfsPath = new URI(pathUri.getScheme(), pathUri.getAuthority(),
+ pathUri.getPath(), null, null);
+ } catch (URISyntaxException urise) {
throw new IOException("Invalid cache specification: " + path);
}
+ boolean exists = false;
+ try {
+ exists = FileLocalizer.fileExists(dfsPath.toString(), pigContext);
+ } catch (IOException ioe) {
+ // Throw a better error message...
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' does not exist!");
+ }
+
+ if (!exists) {
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' does not exist!");
+ } else if (FileLocalizer.isDirectory(dfsPath.toString(), pigContext)) {
+ throw new IOException("Invalid cache specification: '" + dfsPath +
+ "' is a directory and can't be cached!");
+ }
+
cacheSpec.add(path);
}
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=654669&r1=654668&r2=654669&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May 8 19:40:10 2008
@@ -248,6 +248,79 @@
}
@Test
+ public void testInputCacheSpecs() throws Exception {
+ // Can't run this without HDFS
+ if(execType == ExecType.LOCAL)
+ return;
+
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3",
+ "D,2", "A,5", "B,5",
+ "C,8", "A,8", "D,8",
+ "A,9"});
+
+ // Perl script
+ String[] script =
+ new String[] {
+ "#!/usr/bin/perl",
+ "open(INFILE, $ARGV[0]) or die \"Can't open \".$ARGV[0].\"!: $!\";",
+ "while (<INFILE>) {",
+ " chomp $_;",
+ " print STDOUT \"$_\n\";",
+ " print STDERR \"STDERR: $_\n\";",
+ "}",
+ };
+ // Copy the scripts to HDFS
+ File command1 = Util.createInputFile("script", "pl", script);
+ File command2 = Util.createInputFile("script", "pl", script);
+ String c1 = FileLocalizer.hadoopify(command1.toString(),
+ pigServer.getPigContext());
+ String c2 = FileLocalizer.hadoopify(command2.toString(),
+ pigServer.getPigContext());
+
+ // Expected results
+ String[] expectedFirstFields =
+ new String[] {"A", "B", "C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery(
+ "define CMD1 `script1.pl foo` " +
+ "cache ('" + c1 + "#script1.pl') " +
+ "input('foo' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery(
+ "define CMD2 `script2.pl bar` " +
+ "cache ('" + c2 + "#script2.pl') " +
+ "input('bar' using " + PigStorage.class.getName() + "(',')) " +
+ "stderr();");
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',');");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("STREAMED_DATA = stream FILTERED_DATA " +
+ "through CMD1;");
+ pigServer.registerQuery("OP = stream STREAMED_DATA through CMD2;");
+
+ String output = "/pig/out";
+ pigServer.deleteFile(output);
+ pigServer.store("OP", output, PigStorage.class.getName() + "(',')");
+
+ InputStream op = FileLocalizer.open(output, pigServer.getPigContext());
+ PigStorage ps = new PigStorage(",");
+ ps.bindTo("", new BufferedPositionedInputStream(op), 0, Long.MAX_VALUE);
+ List<Tuple> outputs = new ArrayList<Tuple>();
+ Tuple t;
+ while ((t = ps.getNext()) != null) {
+ outputs.add(t);
+ }
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(outputs.iterator(), expectedResults);
+ }
+
+ @Test
public void testOutputShipSpecs() throws Exception {
// FIXME : this should be tested in all modes
if(execType == ExecType.LOCAL)