You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2017/04/28 14:32:38 UTC

svn commit: r1793098 - in /pig/trunk: CHANGES.txt src/org/apache/pig/tools/parameters/PreprocessorContext.java test/org/apache/pig/tools/parameters/ test/org/apache/pig/tools/parameters/TestPreprocessorContext.java

Author: knoguchi
Date: Fri Apr 28 14:32:38 2017
New Revision: 1793098

URL: http://svn.apache.org/viewvc?rev=1793098&view=rev
Log:
PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)

Added:
    pig/trunk/test/org/apache/pig/tools/parameters/
    pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1793098&r1=1793097&r2=1793098&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 28 14:32:38 2017
@@ -97,6 +97,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)
+
 PIG-5221: More fs.default.name deprecation warnings (wattsinabox via daijy)
 
 PIG-5222: Fix Junit Deprecations (wattsinabox via daijy)

Modified: pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1793098&r1=1793097&r2=1793098&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java Fri Apr 28 14:32:38 2017
@@ -25,16 +25,22 @@ package org.apache.pig.tools.parameters;
 import java.io.BufferedReader;
 import java.io.FileReader;
 import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.InputStream;
 import java.io.StringReader;
 import java.util.ArrayDeque;
 import java.util.Deque;
 import java.util.Hashtable;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.Shell;
@@ -203,6 +209,27 @@ public class PreprocessorContext {
         paramval_put(key, sub_val);
     }
 
+    /**
+     * Slurp in an entire input stream and close it.
+     */
+    public static class CallableStreamReader implements Callable<String> {
+        private final InputStream inputStream;
+
+        public CallableStreamReader(InputStream stream) {
+            inputStream = stream;
+        }
+
+        @Override
+        public String call() {
+            try {
+                return IOUtils.toString(inputStream);
+            } catch (IOException e) {
+                throw new RuntimeException("IO Exception while executing shell command: " + e.getMessage() , e);
+            } finally {
+                IOUtils.closeQuietly(inputStream);
+            }
+        }
+    }
 
     /*
      * executes the 'cmd' in shell and returns result
@@ -235,40 +262,21 @@ public class PreprocessorContext {
             throw rte;
         }
 
-        BufferedReader br = null;
-        try{
-            InputStreamReader isr = new InputStreamReader(p.getInputStream());
-            br = new BufferedReader(isr);
-            String line=null;
-            StringBuilder sb = new StringBuilder();
-            while ( (line = br.readLine()) != null){
-                sb.append(line);
-                sb.append("\n");
-            }
-            streamData = sb.toString();
-        } catch (IOException e){
-            RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
-            throw rte;
-        } finally {
-            if (br != null) try {br.close();} catch(Exception e) {}
-        }
+        // Read stdout and stderr in separate threads to avoid deadlock due to pipe buffer size
+        ExecutorService executorService = Executors.newFixedThreadPool(2);
+        Future<String> futureOut = executorService.submit(new CallableStreamReader(p.getInputStream()));
+        Future<String> futureErr = executorService.submit(new CallableStreamReader(p.getErrorStream()));
 
         try {
-            InputStreamReader isr = new InputStreamReader(p.getErrorStream());
-            br = new BufferedReader(isr);
-            String line=null;
-            StringBuilder sb = new StringBuilder();
-            while ( (line = br.readLine()) != null ) {
-                sb.append(line);
-                sb.append("\n");
-            }
-            streamError = sb.toString();
+            streamData = futureOut.get();
+            streamError = futureErr.get();
             log.debug("Error stream while executing shell command : " + streamError);
-        } catch (Exception e) {
-            RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
-            throw rte;
+        } catch (InterruptedException e) {
+            throw new RuntimeException("InterruptedException while executing shell command : " + e.getMessage() , e);
+        } catch (ExecutionException e) {
+            throw new RuntimeException("ExecutionException while executing shell command : " + e.getMessage(), e);
         } finally {
-            if (br != null) try {br.close();} catch(Exception e) {}
+            executorService.shutdownNow();
         }
 
         int exitVal;

Added: pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java?rev=1793098&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java (added)
+++ pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java Fri Apr 28 14:32:38 2017
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.parameters;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class TestPreprocessorContext {
+
+    @Test
+    public void testProcessShellCmd() throws ParameterSubstitutionException, FrontendException {
+        PreprocessorContext ctx = new PreprocessorContext(0);
+        String cmd = "echo hello";
+        ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+        Map<String, String> paramVal = ctx.getParamVal();
+        assertEquals("hello", paramVal.get("some_value"));
+    }
+
+    @Test
+    public void testProcessShellCmdBigStderr() throws ParameterSubstitutionException, FrontendException {
+        // This test probably doesn't work on Windows, but should work elsewhere
+        Assume.assumeFalse(Shell.WINDOWS);
+
+        PreprocessorContext ctx = new PreprocessorContext(0);
+        String cmd = "bash -c 'i=0; while [ \"\\$i\" -lt 10000 ]; do echo long-stderr-output >&2; " +
+                "i=\\$((i+1)); done; echo hello'";
+        ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+        Map<String, String> paramVal = ctx.getParamVal();
+        assertEquals("hello", paramVal.get("some_value"));
+    }
+
+    @Test
+    public void testFailingCommand() throws ParameterSubstitutionException, FrontendException {
+        try {
+            PreprocessorContext ctx = new PreprocessorContext(0);
+            String cmd = "exit 1";
+            ctx.processShellCmd("some_value", "`" + cmd + "`");
+        } catch (RuntimeException e) {
+            assertTrue(Pattern.compile("Error executing shell command:.*exit code.*")
+                    .matcher(e.getMessage()).matches()
+            );
+        }
+    }
+}