You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/04/09 18:14:25 UTC
svn commit: r932484 - in /hadoop/pig/branches/branch-0.7: CHANGES.txt
src/org/apache/pig/builtin/PigStorage.java
test/org/apache/pig/test/TestPigStorage.java
Author: pradeepkth
Date: Fri Apr 9 16:14:24 2010
New Revision: 932484
URL: http://svn.apache.org/viewvc?rev=932484&view=rev
Log:
PIG-1366: PigStorage's pushProjection implementation results in NPE under certain data conditions (pradeepkth)
Modified:
hadoop/pig/branches/branch-0.7/CHANGES.txt
hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java
hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestPigStorage.java
Modified: hadoop/pig/branches/branch-0.7/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/CHANGES.txt?rev=932484&r1=932483&r2=932484&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/CHANGES.txt (original)
+++ hadoop/pig/branches/branch-0.7/CHANGES.txt Fri Apr 9 16:14:24 2010
@@ -181,6 +181,9 @@ OPTIMIZATIONS
BUG FIXES
+PIG-1366: PigStorage's pushProjection implementation results in NPE under
+certain data conditions (pradeepkth)
+
PIG-1365: WrappedIOException is missing from Pig.jar (pradeepkth)
PIG-1362: Provide udf context signature in ensureAllKeysInSameSplit() method
Modified: hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java?rev=932484&r1=932483&r2=932484&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java (original)
+++ hadoop/pig/branches/branch-0.7/src/org/apache/pig/builtin/PigStorage.java Fri Apr 9 16:14:24 2010
@@ -97,6 +97,7 @@ LoadPushDown {
@Override
public Tuple getNext() throws IOException {
+ mProtoTuple = new ArrayList<Object>();
if (!mRequiredColumnsInitialized) {
if (signature!=null) {
Properties p = UDFContext.getUDFContext().getUDFProperties(this.getClass());
@@ -127,7 +128,6 @@ LoadPushDown {
readField(buf, start, len);
}
Tuple t = mTupleFactory.newTupleNoCopy(mProtoTuple);
- mProtoTuple = null;
return t;
} catch (InterruptedException e) {
int errCode = 6018;
@@ -171,10 +171,6 @@ LoadPushDown {
}
private void readField(byte[] buf, int start, int end) {
- if (mProtoTuple == null) {
- mProtoTuple = new ArrayList<Object>();
- }
-
if (start == end) {
// NULL value
mProtoTuple.add(null);
Modified: hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestPigStorage.java?rev=932484&r1=932483&r2=932484&view=diff
==============================================================================
--- hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestPigStorage.java (original)
+++ hadoop/pig/branches/branch-0.7/test/org/apache/pig/test/TestPigStorage.java Fri Apr 9 16:14:24 2010
@@ -26,46 +26,47 @@ import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Iterator;
+import java.util.Properties;
+import java.util.Map.Entry;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.data.Tuple;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.junit.Before;
import org.junit.Test;
-public class TestPigStorage {
+public class TestPigStorage {
protected final Log log = LogFactory.getLog(getClass());
private static MiniCluster cluster = MiniCluster.buildCluster();
- private static PigServer pigServer = null;
-
- @BeforeClass
- public static void setup() {
- try {
- pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
- } catch (ExecException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
-
- @AfterClass
- public static void shutdown() {
- pigServer.shutdown();
+ @Before
+ public void setup() {
+ // some tests are in map-reduce mode and some in local - so before
+ // each test, we will de-initialize FileLocalizer so that temp files
+ // are created correctly depending on the ExecType in the test.
+ FileLocalizer.setInitialized(false);
}
@Test
- public void testBlockBoundary() {
+ public void testBlockBoundary() throws ExecException {
// This tests PigStorage loader with records exectly
// on the boundary of the file blocks.
+ Properties props = new Properties();
+ for (Entry<Object, Object> entry : cluster.getProperties().entrySet()) {
+ props.put(entry.getKey(), entry.getValue());
+ }
+ props.setProperty("mapred.max.split.size", "20");
+ PigServer pigServer = new PigServer(MAPREDUCE, props);
String[] inputs = {
"abcdefgh1", "abcdefgh2", "abcdefgh3",
"abcdefgh4", "abcdefgh5", "abcdefgh6",
@@ -115,5 +116,32 @@ public class TestPigStorage {
}
}
}
+
+ /**
+ * Test to verify that PigStorage works fine in the following scenario:
+ * The column prune optimization determines only columns 2 and 3 are needed
+ * and there are records in the data which have only 1 column (malformed data).
+ * In this case, PigStorage should return an empty tuple to represent columns
+ * 2 and 3 and {@link POProject} would handle catching any
+ * {@link IndexOutOfBoundsException} resulting from accessing a field in the
+ * tuple and substitute a null.
+ */
+ @Test
+ public void testPruneColumnsWithMissingFields() throws IOException {
+ String inputFileName = "TestPigStorage-testPruneColumnsWithMissingFields-input.txt";
+ Util.createLocalInputFile(
+ inputFileName,
+ new String[] {"1\t2\t3", "4", "5\t6\t7"});
+ PigServer ps = new PigServer(ExecType.LOCAL);
+ String script = "a = load '" + inputFileName + "' as (i:int, j:int, k:int);" +
+ "b = foreach a generate j, k;";
+ Util.registerMultiLineQuery(ps, script);
+ Iterator<Tuple> it = ps.openIterator("b");
+ assertEquals(Util.createTuple(new Integer[] { 2, 3}), it.next());
+ assertEquals(Util.createTuple(new Integer[] { null, null}), it.next());
+ assertEquals(Util.createTuple(new Integer[] { 6, 7}), it.next());
+ assertFalse(it.hasNext());
+
+ }
}