You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2012/01/14 00:58:47 UTC
svn commit: r1231410 - in /pig/branches/branch-0.9: ./
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
test/org/apache/pig/test/
Author: daijy
Date: Fri Jan 13 23:58:46 2012
New Revision: 1231410
URL: http://svn.apache.org/viewvc?rev=1231410&view=rev
Log:
PIG-2462: getWrappedSplit is incorrectly returning the first split instead of the current split.
Added:
pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java
Modified:
pig/branches/branch-0.9/CHANGES.txt
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
Modified: pig/branches/branch-0.9/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/CHANGES.txt?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/CHANGES.txt (original)
+++ pig/branches/branch-0.9/CHANGES.txt Fri Jan 13 23:58:46 2012
@@ -40,6 +40,8 @@ PIG-2410: Piggybank does not compile in
BUG FIXES
+PIG-2462: getWrappedSplit is incorrectly returning the first split instead of the current split. (arov via daijy)
+
PIG-2473: Fix eclipse files for pig 9 (daijy)
PIG-2472: piggybank unit tests write directly to /tmp (thw via daijy)
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigRecordReader.java Fri Jan 13 23:58:46 2012
@@ -224,6 +224,7 @@ public class PigRecordReader extends Rec
try {
+ pigSplit.setCurrentIdx(idx);
curReader = inputformat.createRecordReader(pigSplit.getWrappedSplit(idx), context);
if (idx > 0) {
Modified: pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java?rev=1231410&r1=1231409&r2=1231410&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSplit.java Fri Jan 13 23:58:46 2012
@@ -81,6 +81,9 @@ public class PigSplit extends InputSplit
// index
private int splitIndex;
+ // index of current splits being process
+ private int currentIdx;
+
// the flag indicates this is a multi-input join (i.e. join)
// so that custom Hadoop counters will be created in the
// back-end to track the number of records for each input.
@@ -122,6 +125,7 @@ public class PigSplit extends InputSplit
this.inputIndex = inputIndex;
this.targetOps = new ArrayList<OperatorKey>(targetOps);
this.splitIndex = splitIndex;
+ this.currentIdx = 0;
}
public List<OperatorKey> getTargetOps() {
@@ -135,7 +139,7 @@ public class PigSplit extends InputSplit
* @return the wrappedSplit
*/
public InputSplit getWrappedSplit() {
- return wrappedSplits[0];
+ return wrappedSplits[currentIdx];
}
/**
@@ -390,4 +394,8 @@ public class PigSplit extends InputSplit
public boolean disableCounter() {
return disableCounter;
}
+
+ public void setCurrentIdx(int idx) {
+ this.currentIdx = idx;
+ }
}
Added: pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java?rev=1231410&view=auto
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java (added)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestSplitIndex.java Fri Jan 13 23:58:46 2012
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class TestSplitIndex {
+ private PigServer pigServer;
+ File inputDir;
+ @Before
+ public void setUp() throws Exception{
+ pigServer = new PigServer(ExecType.LOCAL, new Properties());
+ inputDir = File.createTempFile("tmp", "");
+ inputDir.delete();
+ inputDir.mkdir();
+ Util.createLocalInputFile(inputDir.getAbsolutePath()+"/1", new String[] {"1\t2"});
+ Util.createLocalInputFile(inputDir.getAbsolutePath()+"/2", new String[] {"3\t4"});
+ }
+
+ @Test
+ public void testSplitIndex() throws Exception {
+ pigServer.registerQuery("a = load '" + inputDir + "' using " + SplitSensitiveLoadFunc.class.getName() + "();");
+ Iterator<Tuple> iter = pigServer.openIterator("a");
+
+ boolean file1exist=false, file2exist=false;
+ Tuple t = iter.next();
+ if (t.get(2).toString().endsWith("/1"))
+ file1exist = true;
+ if (t.get(2).toString().endsWith("/2"))
+ file2exist = true;
+ t = iter.next();
+ if (t.get(2).toString().endsWith("/1"))
+ file1exist = true;
+ if (t.get(2).toString().endsWith("/2"))
+ file2exist = true;
+ if (!file1exist || !file2exist)
+ Assert.fail();
+ }
+
+ @Test
+ public void testSplitIndexNoCombine() throws Exception {
+ pigServer.getPigContext().getProperties().setProperty("pig.splitCombination", "false");
+ pigServer.registerQuery("a = load '" + inputDir + "' using " + SplitSensitiveLoadFunc.class.getName() + "();");
+ Iterator<Tuple> iter = pigServer.openIterator("a");
+
+ boolean file1exist=false, file2exist=false;
+ Tuple t = iter.next();
+ if (t.get(2).toString().endsWith("/1"))
+ file1exist = true;
+ if (t.get(2).toString().endsWith("/2"))
+ file2exist = true;
+ t = iter.next();
+ if (t.get(2).toString().endsWith("/1"))
+ file1exist = true;
+ if (t.get(2).toString().endsWith("/2"))
+ file2exist = true;
+ if (!file1exist || !file2exist)
+ Assert.fail();
+ }
+
+ public static class SplitSensitiveLoadFunc extends PigStorage {
+ Path path = null;
+ public SplitSensitiveLoadFunc() {
+ super();
+ }
+ @Override
+ public void prepareToRead(RecordReader reader, PigSplit split) {
+ in = reader;
+ path = ((FileSplit)split.getWrappedSplit()).getPath();
+ }
+
+ @Override
+ public Tuple getNext() throws IOException {
+ Tuple myTuple = super.getNext();
+ if (myTuple != null)
+ myTuple.append(path.toString());
+ return myTuple;
+ }
+ }
+}