You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2015/02/25 01:17:53 UTC

svn commit: r1662144 - in /pig/trunk: CHANGES.txt src/org/apache/pig/impl/io/ReadToEndLoader.java test/org/apache/pig/test/TestReadToEndLoader.java

Author: daijy
Date: Wed Feb 25 00:17:52 2015
New Revision: 1662144

URL: http://svn.apache.org/r1662144
Log:
PIG-4431: ReadToEndLoader does not close the record reader for the last input split

Added:
    pig/trunk/test/org/apache/pig/test/TestReadToEndLoader.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1662144&r1=1662143&r2=1662144&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Feb 25 00:17:52 2015
@@ -48,6 +48,8 @@ PIG-4333: Split BigData tests into multi
  
 BUG FIXES
 
+PIG-4431: ReadToEndLoader does not close the record reader for the last input split (rdsr via daijy)
+
 PIG-4426: RowNumber(simple) Rank not producing correct results (knoguchi)
 
 PIG-4433: Loading bigdecimal in nested tuple does not work (kpriceyahoo via daijy)

Modified: pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1662144&r1=1662143&r2=1662144&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/ReadToEndLoader.java Wed Feb 25 00:17:52 2015
@@ -196,13 +196,14 @@ public class ReadToEndLoader extends Loa
 
     private boolean initializeReader() throws IOException, 
     InterruptedException {
+        // Close the previous reader first
+        if(reader != null) {
+            reader.close();
+        }
         if(curSplitIndex > inpSplits.size() - 1) {
             // past the last split, we are done
             return false;
         }
-        if(reader != null){
-            reader.close();
-        }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
         TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, 
                 new TaskAttemptID());

Added: pig/trunk/test/org/apache/pig/test/TestReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestReadToEndLoader.java?rev=1662144&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestReadToEndLoader.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestReadToEndLoader.java Wed Feb 25 00:17:52 2015
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.impl.io.ReadToEndLoader;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+import static org.mockito.Mockito.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+public class TestReadToEndLoader {
+    @Test
+    public void testIsReaderForLastSplitClosed() throws Exception {
+        final LoadFunc loadFunc = mock(LoadFunc.class);
+        final InputFormat inputFormat = mock(InputFormat.class);
+        final RecordReader recordReader = mock(RecordReader.class);
+        final InputSplit inputSplit = mock(InputSplit.class);
+        // Define behavior
+        when(loadFunc.getInputFormat()).thenReturn(inputFormat);
+        when(inputFormat.createRecordReader(
+                any(InputSplit.class), any(TaskAttemptContext.class))).thenReturn(recordReader);
+        when(inputFormat.getSplits(any(JobContext.class))).thenReturn(Arrays.asList(inputSplit));
+        Configuration conf = new Configuration();
+        ReadToEndLoader loader = new ReadToEndLoader(loadFunc, conf, "loc", 0);
+        // This will return null since we haven't specified any behavior for this method
+        Assert.assertNull(loader.getNext());
+        // Verify that RecordReader.close for the last input split is called once
+        verify(recordReader, times(1)).close();
+    }
+}