You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2008/05/02 04:30:37 UTC
svn commit: r652735 - in /incubator/pig/trunk: CHANGES.txt
src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
test/org/apache/pig/test/TestStreaming.java
Author: olga
Date: Thu May 1 19:30:37 2008
New Revision: 652735
URL: http://svn.apache.org/viewvc?rev=652735&view=rev
Log:
PIG-226: fix for streaming optimization bug
Modified:
incubator/pig/trunk/CHANGES.txt
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Thu May 1 19:30:37 2008
@@ -257,3 +257,6 @@
PIG-151: fixes to code that handles bzip files
PIG-222: fix build break
+
+ PIG-226: fix for streaming optimization bug
+
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/LoadOptimizer.java Thu May 1 19:30:37 2008
@@ -20,6 +20,7 @@
import java.util.List;
import org.apache.pig.LoadFunc;
+import org.apache.pig.StoreFunc;
import org.apache.pig.builtin.BinaryStorage;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.eval.EvalSpec;
@@ -73,27 +74,31 @@
FileSpec loadFileSpec = load.getInputFileSpec();
- // Instantiate both LoadFunc objects to compare them for
- // equality
- LoadFunc streamLoader =
- (LoadFunc)PigContext.instantiateFuncFromSpec(
+ // Instantiate both to compare them for equality
+ StoreFunc streamStorer =
+ (StoreFunc)PigContext.instantiateFuncFromSpec(
streamInputSpec.getSpec());
LoadFunc inputLoader = (LoadFunc)PigContext.instantiateFuncFromSpec(
loadFileSpec.getFuncSpec());
- // Check if both LoadFunc objects belong to the same type
+ // Check if the streaming command's inputSpec also implements
+ // LoadFunc and if it does, are they of the same type?
boolean sameType = false;
try {
- streamLoader.getClass().cast(inputLoader);
- sameType = true;
+ // TODO: We should actually check if the streamStorer
+ // is _reversible_ as the inputLoader ...
+ if (streamStorer instanceof LoadFunc) {
+ streamStorer.getClass().cast(inputLoader);
+ sameType = true;
+ }
} catch (ClassCastException cce) {
sameType = false;
}
// Check if both LoadFunc objects belong to the same type and
// are equivalent
- if (sameType && streamLoader.equals(inputLoader)) {
+ if (sameType && streamStorer.equals(inputLoader)) {
// Since they both are the same, we can flip them
// for BinaryStorage
load.setInputFileSpec(new FileSpec(loadFileSpec.getFileName(), BinaryStorage.class.getName()));
Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/streaming/StoreOptimizer.java Thu May 1 19:30:37 2008
@@ -19,6 +19,7 @@
import java.util.List;
+import org.apache.pig.LoadFunc;
import org.apache.pig.StoreFunc;
import org.apache.pig.builtin.BinaryStorage;
import org.apache.pig.impl.PigContext;
@@ -98,28 +99,32 @@
FileSpec storeFileSpec = s.getOutputFileSpec();
- // Instantiate both StoreFunc objects to compare them for
- // equality
- StoreFunc streamStorer =
- (StoreFunc)PigContext.instantiateFuncFromSpec(
+ // Instantiate both to compare them for equality
+ LoadFunc streamLoader =
+ (LoadFunc)PigContext.instantiateFuncFromSpec(
streamOutputSpec.getSpec());
StoreFunc outputStorer = (StoreFunc)PigContext.instantiateFuncFromSpec(
storeFileSpec.getFuncSpec());
- // Check if both LoadFunc objects belong to the same type
+ // Check if the streaming command's outputSpec also implements
+ // StoreFunc and if it does, are they of the same type?
boolean sameType = false;
try {
- streamStorer.getClass().cast(outputStorer);
- sameType = true;
+ // TODO: We should actually check if the streamLoader
+ // is _reversible_ as the outputStorer ...
+ if (streamLoader instanceof StoreFunc) {
+ streamLoader.getClass().cast(outputStorer);
+ sameType = true;
+ }
} catch (ClassCastException cce) {
sameType = false;
}
// Check if both LoadFunc objects belong to the same type and
// are equivalent
- if (sameType && streamStorer.equals(outputStorer)) {
+ if (sameType && streamLoader.equals(outputStorer)) {
// Since they both are the same, we can flip them
// for BinaryStorage
s.setOutputFileSpec(new FileSpec(storeFileSpec.getFileName(), BinaryStorage.class.getName()));
Modified: incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java?rev=652735&r1=652734&r2=652735&view=diff
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java (original)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestStreaming.java Thu May 1 19:30:37 2008
@@ -470,4 +470,42 @@
Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
}
+ @Test
+ public void testLocalNegativeLoadStoreOptimization() throws Exception {
+ testNegativeLoadStoreOptimization(ExecType.LOCAL);
+ }
+
+ @Test
+ public void testMRNegativeLoadStoreOptimization() throws Exception {
+ testNegativeLoadStoreOptimization(ExecType.MAPREDUCE);
+ }
+
+ private void testNegativeLoadStoreOptimization(ExecType execType)
+ throws Exception {
+ PigServer pigServer = createPigServer(execType);
+ File input = Util.createInputFile("tmp", "",
+ new String[] {"A,1", "B,2", "C,3", "D,2",
+ "A,5", "B,5", "C,8", "A,8",
+ "D,8", "A,9"});
+
+ // Expected results
+ String[] expectedFirstFields = new String[] {"A", "B", "C", "A", "D", "A"};
+ int[] expectedSecondFields = new int[] {5, 5, 8, 8, 8, 9};
+ Tuple[] expectedResults =
+ setupExpectedResults(expectedFirstFields, expectedSecondFields);
+
+ // Pig query to run
+ pigServer.registerQuery("define CMD `"+ simpleEchoStreamingCommand +
+ "` input(stdin using PigDump());");
+ pigServer.registerQuery("IP = load 'file:" + input + "' using " +
+ PigStorage.class.getName() + "(',') " +
+ "split by 'file';");
+ pigServer.registerQuery("FILTERED_DATA = filter IP by $1 > '3';");
+ pigServer.registerQuery("OP = stream FILTERED_DATA through `" +
+ simpleEchoStreamingCommand + "`;");
+
+ // Run the query and check the results
+ Util.checkQueryOutputs(pigServer.openIterator("OP"), expectedResults);
+ }
+
}