You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by kn...@apache.org on 2017/04/28 14:32:38 UTC
svn commit: r1793098 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/tools/parameters/PreprocessorContext.java
test/org/apache/pig/tools/parameters/
test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
Author: knoguchi
Date: Fri Apr 28 14:32:38 2017
New Revision: 1793098
URL: http://svn.apache.org/viewvc?rev=1793098&view=rev
Log:
PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)
Added:
pig/trunk/test/org/apache/pig/tools/parameters/
pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1793098&r1=1793097&r2=1793098&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Fri Apr 28 14:32:38 2017
@@ -97,6 +97,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5226: PreprocessorContext.java can deadlock forever with large stderr (jtolar via knoguchi)
+
PIG-5221: More fs.default.name deprecation warnings (wattsinabox via daijy)
PIG-5222: Fix Junit Deprecations (wattsinabox via daijy)
Modified: pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java?rev=1793098&r1=1793097&r2=1793098&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java (original)
+++ pig/trunk/src/org/apache/pig/tools/parameters/PreprocessorContext.java Fri Apr 28 14:32:38 2017
@@ -25,16 +25,22 @@ package org.apache.pig.tools.parameters;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
-import java.io.InputStreamReader;
+import java.io.InputStream;
import java.io.StringReader;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.Shell;
@@ -203,6 +209,27 @@ public class PreprocessorContext {
paramval_put(key, sub_val);
}
+ /**
+ * Slurp in an entire input stream and close it.
+ */
+ public static class CallableStreamReader implements Callable<String> {
+ private final InputStream inputStream;
+
+ public CallableStreamReader(InputStream stream) {
+ inputStream = stream;
+ }
+
+ @Override
+ public String call() {
+ try {
+ return IOUtils.toString(inputStream);
+ } catch (IOException e) {
+ throw new RuntimeException("IO Exception while executing shell command: " + e.getMessage() , e);
+ } finally {
+ IOUtils.closeQuietly(inputStream);
+ }
+ }
+ }
/*
* executes the 'cmd' in shell and returns result
@@ -235,40 +262,21 @@ public class PreprocessorContext {
throw rte;
}
- BufferedReader br = null;
- try{
- InputStreamReader isr = new InputStreamReader(p.getInputStream());
- br = new BufferedReader(isr);
- String line=null;
- StringBuilder sb = new StringBuilder();
- while ( (line = br.readLine()) != null){
- sb.append(line);
- sb.append("\n");
- }
- streamData = sb.toString();
- } catch (IOException e){
- RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
- throw rte;
- } finally {
- if (br != null) try {br.close();} catch(Exception e) {}
- }
+ // Read stdout and stderr in separate threads to avoid deadlock due to pipe buffer size
+ ExecutorService executorService = Executors.newFixedThreadPool(2);
+ Future<String> futureOut = executorService.submit(new CallableStreamReader(p.getInputStream()));
+ Future<String> futureErr = executorService.submit(new CallableStreamReader(p.getErrorStream()));
try {
- InputStreamReader isr = new InputStreamReader(p.getErrorStream());
- br = new BufferedReader(isr);
- String line=null;
- StringBuilder sb = new StringBuilder();
- while ( (line = br.readLine()) != null ) {
- sb.append(line);
- sb.append("\n");
- }
- streamError = sb.toString();
+ streamData = futureOut.get();
+ streamError = futureErr.get();
log.debug("Error stream while executing shell command : " + streamError);
- } catch (Exception e) {
- RuntimeException rte = new RuntimeException("IO Exception while executing shell command : "+e.getMessage() , e);
- throw rte;
+ } catch (InterruptedException e) {
+ throw new RuntimeException("InterruptedException while executing shell command : " + e.getMessage() , e);
+ } catch (ExecutionException e) {
+ throw new RuntimeException("ExecutionException while executing shell command : " + e.getMessage(), e);
} finally {
- if (br != null) try {br.close();} catch(Exception e) {}
+ executorService.shutdownNow();
}
int exitVal;
Added: pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java?rev=1793098&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java (added)
+++ pig/trunk/test/org/apache/pig/tools/parameters/TestPreprocessorContext.java Fri Apr 28 14:32:38 2017
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.tools.parameters;
+
+import org.apache.hadoop.util.Shell;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.Assume;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.util.Map;
+import java.util.regex.Pattern;
+
+public class TestPreprocessorContext {
+
+ @Test
+ public void testProcessShellCmd() throws ParameterSubstitutionException, FrontendException {
+ PreprocessorContext ctx = new PreprocessorContext(0);
+ String cmd = "echo hello";
+ ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+ Map<String, String> paramVal = ctx.getParamVal();
+ assertEquals("hello", paramVal.get("some_value"));
+ }
+
+ @Test
+ public void testProcessShellCmdBigStderr() throws ParameterSubstitutionException, FrontendException {
+ // This test probably doesn't work on Windows, but should work elsewhere
+ Assume.assumeFalse(Shell.WINDOWS);
+
+ PreprocessorContext ctx = new PreprocessorContext(0);
+ String cmd = "bash -c 'i=0; while [ \"\\$i\" -lt 10000 ]; do echo long-stderr-output >&2; " +
+ "i=\\$((i+1)); done; echo hello'";
+ ctx.processShellCmd("some_value", "`" + cmd + "`");
+
+ Map<String, String> paramVal = ctx.getParamVal();
+ assertEquals("hello", paramVal.get("some_value"));
+ }
+
+ @Test
+ public void testFailingCommand() throws ParameterSubstitutionException, FrontendException {
+ try {
+ PreprocessorContext ctx = new PreprocessorContext(0);
+ String cmd = "exit 1";
+ ctx.processShellCmd("some_value", "`" + cmd + "`");
+ } catch (RuntimeException e) {
+ assertTrue(Pattern.compile("Error executing shell command:.*exit code.*")
+ .matcher(e.getMessage()).matches()
+ );
+ }
+ }
+}