You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/08/21 05:34:15 UTC
tajo git commit: TAJO-1596: TestPythonFunctions occasionally fails.
Repository: tajo
Updated Branches:
refs/heads/master 0b59a93ba -> f21d5d677
TAJO-1596: TestPythonFunctions occasionally fails.
Closes #705
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f21d5d67
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f21d5d67
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f21d5d67
Branch: refs/heads/master
Commit: f21d5d677c2ba1b9c493498ecb512d1ddcfe2221
Parents: 0b59a93
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Aug 21 12:33:29 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Aug 21 12:33:29 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/master/QueryManager.java | 3 +-
.../src/main/resources/python/controller.py | 3 -
.../function/python/PythonScriptEngine.java | 97 ++++++++++++++------
4 files changed, 75 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2e93f51..8653bbc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -232,6 +232,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1596: TestPythonFunctions occasionally fails. (jinho)
+
TAJO-1741: Two tables having same time zone display different timestamps.
(Contributed Jongyoung Park, committed by hyunsik)
http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 8838986..95562ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -226,8 +226,9 @@ public class QueryManager extends CompositeService {
public boolean startQueryJob(QueryId queryId, AllocationResourceProto allocation) {
if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) {
- QueryInProgress queryInProgress = submittedQueries.remove(queryId);
+ QueryInProgress queryInProgress = submittedQueries.get(queryId);
runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+ submittedQueries.remove(queryId);
dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
queryInProgress.getQueryInfo()));
return true;
http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/resources/python/controller.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py
index 126ccdc..8d0f995 100644
--- a/tajo-core/src/main/resources/python/controller.py
+++ b/tajo-core/src/main/resources/python/controller.py
@@ -113,9 +113,6 @@ class PythonStreamingController:
self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0)
self.input_stream = sys.stdin
- # TODO: support controller logging
- self.log_stream = open(output_stream_path, 'a')
- sys.stderr = open(error_stream_path, 'w')
sys.path.append(file_path)
sys.path.append(cache_path)
http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 02dff19..2d93aa9 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -19,6 +19,7 @@
package org.apache.tajo.plan.function.python;
import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -27,12 +28,17 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
import org.apache.tajo.common.TajoDataTypes;
import org.apache.tajo.conf.TajoConf;
import org.apache.tajo.datum.Datum;
-import org.apache.tajo.function.*;
+import org.apache.tajo.function.FunctionInvocation;
+import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.function.FunctionSupplement;
+import org.apache.tajo.function.PythonInvocationDesc;
import org.apache.tajo.plan.function.FunctionContext;
import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
import org.apache.tajo.plan.function.stream.*;
import org.apache.tajo.storage.Tuple;
import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
import org.apache.tajo.util.FileUtil;
import org.apache.tajo.util.TUtil;
@@ -323,12 +329,26 @@ public class PythonScriptEngine extends TajoScriptEngine {
@Override
public void shutdown() {
- process.destroy();
FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler);
stdin = null;
stdout = stderr = null;
inputHandler = null;
outputHandler = null;
+
+ try {
+ int exitCode = process.waitFor();
+
+ if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+ LOG.warn("Process exit code: " + exitCode);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Process exit code: " + exitCode);
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.warn(e.getMessage(), e);
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug("PythonScriptExecutor shuts down");
}
@@ -485,14 +505,15 @@ public class PythonScriptEngine extends TajoScriptEngine {
try {
inputHandler.putNext(input, inSchema);
stdin.flush();
- } catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue", e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
}
- Datum result;
+
+ Datum result = null;
try {
result = outputHandler.getNext().asDatum(0);
- } catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
}
return result;
@@ -519,14 +540,36 @@ public class PythonScriptEngine extends TajoScriptEngine {
try {
inputHandler.putNext(methodName, input, inSchema);
stdin.flush();
- } catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue while executing " + methodName + " with " + input, e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue while executing "
+ + methodName + " with " + input, e));
}
try {
outputHandler.getNext();
} catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
+ }
+ }
+
+ /**
+ * Get the standard error streams of the external process and throw the exception
+ *
+ * @throws RuntimeException
+ */
+ private void throwException(InputStream stderr, RuntimeException e) throws RuntimeException {
+ try {
+ if (stderr.available() > 0) {
+ byte[] bytes = new byte[Math.min(stderr.available(), 100 * StorageUnit.KB)];
+ IOUtils.readFully(stderr, bytes);
+ String message = new String(bytes, Charset.defaultCharset());
+
+ throw new RuntimeException("Python exception caused by: " + message, e);
+ } else {
+ throw e;
+ }
+ } catch (IOException ioe) {
+ throw new RuntimeException(ioe.getMessage(), ioe);
}
}
@@ -540,13 +583,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
try {
inputHandler.putNext("update_context", functionContext);
stdin.flush();
- } catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue", e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
}
try {
outputHandler.getNext();
- } catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
}
}
@@ -560,13 +603,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
try {
inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA);
stdin.flush();
- } catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue", e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
}
try {
outputHandler.getNext(functionContext);
- } catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
}
}
@@ -581,14 +624,16 @@ public class PythonScriptEngine extends TajoScriptEngine {
try {
inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA);
stdin.flush();
- } catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue", e);
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
}
+ String result = null;
try {
- return outputHandler.getPartialResultString();
- } catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ result = outputHandler.getPartialResultString();
+ } catch (Throwable e) {
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
}
+ return result;
}
/**
@@ -603,13 +648,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA);
stdin.flush();
} catch (Exception e) {
- throw new RuntimeException("Failed adding input to inputQueue", e);
+ throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
}
- Datum result;
+ Datum result = null;
try {
result = outputHandler.getNext().asDatum(0);
} catch (Exception e) {
- throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+ throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
}
return result;