You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/01/14 00:58:47 UTC

svn commit: r1231410 - in /pig/branches/branch-0.9: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ test/org/apache/pig/test/

Author: daijy
Date: Fri Jan 13 23:58:46 2012
New Revision: 1231410

URL: http://svn.apache.org/viewvc?rev=1231410&view=rev
Log:
PIG-2462: getWrappedSplit is incorrectly returning the first split instead of the current split.

Added:
    pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java
Modified:
    pig/branches/branch-0.9/CHANGES.txt
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
    pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java

Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Fri Jan 13 23:58:46 2012
@@ -40,6 +40,8 @@ PIG-2410: Piggybank does not compile in 
 
 BUG FIXES
 
+PIG-2462: getWrappedSplit is incorrectly returning the first split instead of the current split. (arov via daijy)
+
 PIG-2473: Fix eclipse files for pig 9 (daijy)
 
 PIG-2472: piggybank unit tests write directly to /tmp (thw via daijy)

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Jan 13 23:58:46 2012
@@ -224,6 +224,7 @@ public class PigRecordReader extends Rec
         try {
           
 
+            pigSplit.setCurrentIdx(idx);
             curReader =  inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
 
             if (idx > 0) {

Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Jan 13 23:58:46 2012
@@ -81,6 +81,9 @@ public class PigSplit extends InputSplit
     // index
     private int splitIndex;
     
+    // index of current splits being process
+    private int currentIdx;
+    
     // the flag indicates this is a multi-input join (i.e. join)
     // so that custom Hadoop counters will be created in the 
     // back-end to track the number of records for each input.
@@ -122,6 +125,7 @@ public class PigSplit extends InputSplit
         this.inputIndex = inputIndex;
         this.targetOps = new ArrayList<OperatorKey>(targetOps);
         this.splitIndex = splitIndex;
+        this.currentIdx = 0;
     }
     
     public List<OperatorKey> getTargetOps() {
@@ -135,7 +139,7 @@ public class PigSplit extends InputSplit
      * @return the wrappedSplit
      */
     public InputSplit getWrappedSplit() {
-        return wrappedSplits[0];
+        return wrappedSplits[currentIdx];
     }
     
     /**
@@ -390,4 +394,8 @@ public class PigSplit extends InputSplit
     public boolean disableCounter() {
         return disableCounter;
     }
+    
+    public void setCurrentIdx(int idx) {
+        this.currentIdx = idx;
+    }
 }

Added: pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java?rev=1231410&view=auto
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java (added)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java Fri Jan 13 23:58:46 2012
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestSplitIndex {
+    private PigServer pigServer;
+    File inputDir;
+    @Before
+    public void setUp() throws Exception{
+        pigServer = new PigServer(ExecType.LOCAL, new Properties());
+        inputDir = File.createTempFile("tmp", "");
+        inputDir.delete();
+        inputDir.mkdir();
+        Util.createLocalInputFile(inputDir.getAbsolutePath()+"/1", new String[] {"1\t2"});
+        Util.createLocalInputFile(inputDir.getAbsolutePath()+"/2", new String[] {"3\t4"});
+    }
+    
+    @Test
+    public void testSplitIndex() throws Exception {
+        pigServer.registerQuery("a = load '" + inputDir + "' using " + SplitSensitiveLoadFunc.class.getName() + "();");
+        Iterator<Tuple> iter = pigServer.openIterator("a");
+        
+        boolean file1exist=false, file2exist=false;
+        Tuple t = iter.next();
+        if (t.get(2).toString().endsWith("/1"))
+            file1exist = true;
+        if (t.get(2).toString().endsWith("/2"))
+            file2exist = true;
+        t = iter.next();
+        if (t.get(2).toString().endsWith("/1"))
+            file1exist = true;
+        if (t.get(2).toString().endsWith("/2"))
+            file2exist = true;
+        if (!file1exist || !file2exist)
+            Assert.fail();
+    }
+    
+    @Test
+    public void testSplitIndexNoCombine() throws Exception {
+        pigServer.getPigContext().getProperties().setProperty("pig.splitCombination", "false");
+        pigServer.registerQuery("a = load '" + inputDir + "' using " + SplitSensitiveLoadFunc.class.getName() + "();");
+        Iterator<Tuple> iter = pigServer.openIterator("a");
+        
+        boolean file1exist=false, file2exist=false;
+        Tuple t = iter.next();
+        if (t.get(2).toString().endsWith("/1"))
+            file1exist = true;
+        if (t.get(2).toString().endsWith("/2"))
+            file2exist = true;
+        t = iter.next();
+        if (t.get(2).toString().endsWith("/1"))
+            file1exist = true;
+        if (t.get(2).toString().endsWith("/2"))
+            file2exist = true;
+        if (!file1exist || !file2exist)
+            Assert.fail();
+    }
+    
+    public static class SplitSensitiveLoadFunc extends PigStorage {
+        Path path = null;
+        public SplitSensitiveLoadFunc() {
+            super();
+        }
+        @Override
+        public void prepareToRead(RecordReader reader, PigSplit split) {
+            in = reader;
+            path = ((FileSplit)split.getWrappedSplit()).getPath();
+        }
+        
+        @Override
+        public Tuple getNext() throws IOException {
+            Tuple myTuple = super.getNext();
+            if (myTuple != null)
+                myTuple.append(path.toString());
+            return myTuple;
+        }
+    }
+}