You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/08/21 05:34:15 UTC

tajo git commit: TAJO-1596: TestPythonFunctions occasionally fails.

Repository: tajo
Updated Branches:
  refs/heads/master 0b59a93ba -> f21d5d677


TAJO-1596: TestPythonFunctions occasionally fails.

Closes #705


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/f21d5d67
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/f21d5d67
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/f21d5d67

Branch: refs/heads/master
Commit: f21d5d677c2ba1b9c493498ecb512d1ddcfe2221
Parents: 0b59a93
Author: Jinho Kim <jh...@apache.org>
Authored: Fri Aug 21 12:33:29 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Fri Aug 21 12:33:29 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/master/QueryManager.java    |  3 +-
 .../src/main/resources/python/controller.py     |  3 -
 .../function/python/PythonScriptEngine.java     | 97 ++++++++++++++------
 4 files changed, 75 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 2e93f51..8653bbc 100644
--- a/CHANGES
+++ b/CHANGES
@@ -232,6 +232,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1596: TestPythonFunctions occasionally fails. (jinho)
+
     TAJO-1741: Two tables having same time zone display different timestamps.
     (Contributed Jongyoung Park, committed by hyunsik)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 8838986..95562ed 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -226,8 +226,9 @@ public class QueryManager extends CompositeService {
   public boolean startQueryJob(QueryId queryId, AllocationResourceProto allocation) {
 
     if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) {
-      QueryInProgress queryInProgress = submittedQueries.remove(queryId);
+      QueryInProgress queryInProgress = submittedQueries.get(queryId);
       runningQueries.put(queryInProgress.getQueryId(), queryInProgress);
+      submittedQueries.remove(queryId);
       dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START,
           queryInProgress.getQueryInfo()));
       return true;

http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-core/src/main/resources/python/controller.py
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py
index 126ccdc..8d0f995 100644
--- a/tajo-core/src/main/resources/python/controller.py
+++ b/tajo-core/src/main/resources/python/controller.py
@@ -113,9 +113,6 @@ class PythonStreamingController:
         self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0)
 
         self.input_stream = sys.stdin
-        # TODO: support controller logging
-        self.log_stream = open(output_stream_path, 'a')
-        sys.stderr = open(error_stream_path, 'w')
 
         sys.path.append(file_path)
         sys.path.append(cache_path)

http://git-wip-us.apache.org/repos/asf/tajo/blob/f21d5d67/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
----------------------------------------------------------------------
diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
index 02dff19..2d93aa9 100644
--- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
+++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java
@@ -19,6 +19,7 @@
 package org.apache.tajo.plan.function.python;
 
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -27,12 +28,17 @@ import org.apache.tajo.catalog.proto.CatalogProtos;
 import org.apache.tajo.common.TajoDataTypes;
 import org.apache.tajo.conf.TajoConf;
 import org.apache.tajo.datum.Datum;
-import org.apache.tajo.function.*;
+import org.apache.tajo.function.FunctionInvocation;
+import org.apache.tajo.function.FunctionSignature;
+import org.apache.tajo.function.FunctionSupplement;
+import org.apache.tajo.function.PythonInvocationDesc;
 import org.apache.tajo.plan.function.FunctionContext;
 import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext;
 import org.apache.tajo.plan.function.stream.*;
 import org.apache.tajo.storage.Tuple;
 import org.apache.tajo.storage.VTuple;
+import org.apache.tajo.unit.StorageUnit;
+import org.apache.tajo.util.CommonTestingUtil;
 import org.apache.tajo.util.FileUtil;
 import org.apache.tajo.util.TUtil;
 
@@ -323,12 +329,26 @@ public class PythonScriptEngine extends TajoScriptEngine {
 
   @Override
   public void shutdown() {
-    process.destroy();
     FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler);
     stdin = null;
     stdout = stderr = null;
     inputHandler = null;
     outputHandler = null;
+
+    try {
+      int exitCode = process.waitFor();
+
+      if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) {
+        LOG.warn("Process exit code: " + exitCode);
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Process exit code: " + exitCode);
+        }
+      }
+    } catch (InterruptedException e) {
+      LOG.warn(e.getMessage(), e);
+    }
+
     if (LOG.isDebugEnabled()) {
       LOG.debug("PythonScriptExecutor shuts down");
     }
@@ -485,14 +505,15 @@ public class PythonScriptEngine extends TajoScriptEngine {
     try {
       inputHandler.putNext(input, inSchema);
       stdin.flush();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue", e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
     }
-    Datum result;
+
+    Datum result = null;
     try {
       result = outputHandler.getNext().asDatum(0);
-    } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
     }
 
     return result;
@@ -519,14 +540,36 @@ public class PythonScriptEngine extends TajoScriptEngine {
     try {
       inputHandler.putNext(methodName, input, inSchema);
       stdin.flush();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue while executing " + methodName + " with " + input, e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue while executing "
+          + methodName + " with " + input, e));
     }
 
     try {
       outputHandler.getNext();
     } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
+    }
+  }
+
+  /**
+   * Get the standard error streams of the external process and throw the exception
+   *
+   * @throws RuntimeException
+   */
+  private void throwException(InputStream stderr, RuntimeException e) throws RuntimeException {
+    try {
+      if (stderr.available() > 0) {
+        byte[] bytes = new byte[Math.min(stderr.available(), 100 * StorageUnit.KB)];
+        IOUtils.readFully(stderr, bytes);
+        String message = new String(bytes, Charset.defaultCharset());
+
+        throw new RuntimeException("Python exception caused by: " + message, e);
+      } else {
+        throw e;
+      }
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe.getMessage(), ioe);
     }
   }
 
@@ -540,13 +583,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
     try {
       inputHandler.putNext("update_context", functionContext);
       stdin.flush();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue", e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
     }
     try {
       outputHandler.getNext();
-    } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
     }
   }
 
@@ -560,13 +603,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
     try {
       inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA);
       stdin.flush();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue", e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
     }
     try {
       outputHandler.getNext(functionContext);
-    } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
     }
   }
 
@@ -581,14 +624,16 @@ public class PythonScriptEngine extends TajoScriptEngine {
     try {
       inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA);
       stdin.flush();
-    } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue", e);
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
     }
+    String result = null;
     try {
-      return outputHandler.getPartialResultString();
-    } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+      result = outputHandler.getPartialResultString();
+    } catch (Throwable e) {
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
     }
+    return result;
   }
 
   /**
@@ -603,13 +648,13 @@ public class PythonScriptEngine extends TajoScriptEngine {
       inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA);
       stdin.flush();
     } catch (Exception e) {
-      throw new RuntimeException("Failed adding input to inputQueue", e);
+      throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e));
     }
-    Datum result;
+    Datum result = null;
     try {
       result = outputHandler.getNext().asDatum(0);
     } catch (Exception e) {
-      throw new RuntimeException("Problem getting output: " + e.getMessage(), e);
+      throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e));
     }
 
     return result;