You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by ji...@apache.org on 2014/08/01 07:59:02 UTC

[09/14] git commit: TAJO-980: execution page in Web UI broken.

TAJO-980: execution page in Web UI broken.

Closes #97


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/fe870851
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/fe870851
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/fe870851

Branch: refs/heads/index_support
Commit: fe870851934e76cde745f84b0b8061480031a7d1
Parents: b49ff30
Author: Hyunsik Choi <hy...@apache.org>
Authored: Tue Jul 29 11:39:23 2014 +0900
Committer: Hyunsik Choi <hy...@apache.org>
Committed: Tue Jul 29 11:39:23 2014 +0900

----------------------------------------------------------------------
 CHANGES                                         |   2 +
 .../tajo/webapp/QueryExecutorServlet.java       | 200 ++++++++++++-------
 2 files changed, 134 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/fe870851/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 95747a1..4390177 100644
--- a/CHANGES
+++ b/CHANGES
@@ -97,6 +97,8 @@ Release 0.9.0 - unreleased
 
   BUG FIXES
 
+    TAJO-980: execution page in Web UI broken. (hyunsik)
+
     TAJO-952: Wrong default partition volume config. (Mai Hai Thanh via jihoon)
 
     TAJO-974: Eliminate unexpected case condition in SubQuery. (Hyoungjun Kim 

http://git-wip-us.apache.org/repos/asf/tajo/blob/fe870851/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
index 3cb7d25..8b849ca 100644
--- a/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
+++ b/tajo-core/src/main/java/org/apache/tajo/webapp/QueryExecutorServlet.java
@@ -1,9 +1,11 @@
 package org.apache.tajo.webapp;
 
+import com.google.protobuf.ServiceException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.tajo.QueryId;
+import org.apache.tajo.QueryIdFactory;
 import org.apache.tajo.TajoProtos;
 import org.apache.tajo.catalog.CatalogUtil;
 import org.apache.tajo.catalog.TableDesc;
@@ -26,6 +28,7 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.sql.ResultSet;
 import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -159,7 +162,7 @@ public class QueryExecutorServlet extends HttpServlet {
             return;
           }
           returnValue.put("numOfRows", queryRunner.numOfRows);
-          returnValue.put("resultSize", queryRunner.resultSize);
+          returnValue.put("resultSize", queryRunner.resultRows);
           returnValue.put("resultData", queryRunner.queryResult);
           returnValue.put("resultColumns", queryRunner.columnNames);
           returnValue.put("runningTime", JSPUtil.getElapsedTime(queryRunner.startTime, queryRunner.finishTime));
@@ -239,12 +242,12 @@ public class QueryExecutorServlet extends HttpServlet {
 
     String queryRunnerId;
 
-    ClientProtos.SubmitQueryResponse queryRespons;
+    ClientProtos.SubmitQueryResponse response;
     AtomicBoolean running = new AtomicBoolean(true);
     AtomicBoolean stop = new AtomicBoolean(false);
     QueryId queryId;
     String query;
-    long resultSize;
+    long resultRows;
     int sizeLimit;
     long numOfRows;
     Exception error;
@@ -268,60 +271,110 @@ public class QueryExecutorServlet extends HttpServlet {
     public void run() {
       startTime = System.currentTimeMillis();
       try {
-        queryRespons = tajoClient.executeQuery(query);
-        if (queryRespons.getResultCode() == ClientProtos.ResultCode.OK) {
-          QueryId queryId = null;
-          try {
-            queryId = new QueryId(queryRespons.getQueryId());
+        response = tajoClient.executeQuery(query);
+
+        if (response == null) {
+          LOG.error("Internal Error: SubmissionResponse is NULL");
+          error = new Exception("Internal Error: SubmissionResponse is NULL");
+
+        } else if (response.getResultCode() == ClientProtos.ResultCode.OK) {
+          if (response.getIsForwarded()) {
+            queryId = new QueryId(response.getQueryId());
             getQueryResult(queryId);
-          } finally {
-            if (queryId != null) {
-              tajoClient.closeQuery(queryId);
+          } else {
+            if (!response.hasTableDesc() && !response.hasResultSet()) {
+            } else {
+              getSimpleQueryResult(response);
             }
+
+            progress.set(100);
           }
-        } else {
-          LOG.error("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
-          error = new Exception("queryRespons.getResultCode() not OK:" + queryRespons.getResultCode());
         }
       } catch (Exception e) {
         LOG.error(e.getMessage(), e);
         error = e;
       } finally {
         running.set(false);
+
         finishTime = System.currentTimeMillis();
+
+        if (queryId != null) {
+          tajoClient.closeQuery(queryId);
+        }
       }
     }
 
-    private void getQueryResult(QueryId tajoQueryId) {
-      // query execute
+    private void getSimpleQueryResult(ClientProtos.SubmitQueryResponse response) {
+      ResultSet res = null;
       try {
-        QueryStatus status = null;
+        QueryId queryId = new QueryId(response.getQueryId());
+        TableDesc desc = new TableDesc(response.getTableDesc());
 
-        while (!stop.get()) {
+        if (response.getMaxRowNum() < 0 && queryId.equals(QueryIdFactory.NULL_QUERY_ID)) {
+          // non-forwarded INSERT INTO query does not have any query id.
+          // In this case, it just returns succeeded query information without printing the query results.
+        } else {
+          res = TajoClient.createResultSet(tajoClient, response);
+          MakeResultText(res, desc);
+        }
+        progress.set(100);
+      } catch (Exception e) {
+        LOG.error(e.getMessage(), e);
+        error = e;
+      } finally {
+        if (res != null) {
           try {
-            Thread.sleep(1000);
-          } catch(InterruptedException e) {
-            break;
-          }
-          status = tajoClient.getQueryStatus(tajoQueryId);
-          if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT
-              || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) {
-            continue;
+            res.close();
+          } catch (SQLException e) {
           }
+        }
+      }
+    }
 
-          if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING
-              || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
-            int progressValue = (int) (status.getProgress() * 100.0f);
-            if(progressValue == 100)  {
-              progressValue = 99;
-            }
-            progress.set(progressValue);
-          }
-          if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING
-              && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) {
-            break;
+    private QueryStatus waitForComplete(QueryId queryid) throws ServiceException {
+      QueryStatus status = null;
+
+      while (!stop.get()) {
+
+        try {
+          Thread.sleep(150);
+        } catch(InterruptedException e) {
+          break;
+        }
+
+        status = tajoClient.getQueryStatus(queryid);
+        if (status.getState() == TajoProtos.QueryState.QUERY_MASTER_INIT
+            || status.getState() == TajoProtos.QueryState.QUERY_MASTER_LAUNCHED) {
+          continue;
+        }
+
+        if (status.getState() == TajoProtos.QueryState.QUERY_RUNNING
+            || status.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) {
+          int progressValue = (int) (status.getProgress() * 100.0f);
+          if(progressValue == 100)  {
+            progressValue = 99;
           }
+          progress.set(progressValue);
         }
+        if (status.getState() != TajoProtos.QueryState.QUERY_RUNNING
+            && status.getState() != TajoProtos.QueryState.QUERY_NOT_ASSIGNED) {
+          break;
+        }
+
+        try {
+          Thread.sleep(100);
+        } catch(InterruptedException e) {
+          break;
+        }
+      }
+
+      return status;
+    }
+
+    private void getQueryResult(QueryId tajoQueryId) {
+      // query execute
+      try {
+        QueryStatus status = waitForComplete(tajoQueryId);
 
         if(status == null) {
           LOG.error("Query Status is null");
@@ -344,44 +397,21 @@ public class QueryExecutorServlet extends HttpServlet {
                 tajoClient.getConf().setVar(TajoConf.ConfVars.USERNAME, response.getTajoUserName());
                 res = new TajoResultSet(tajoClient, queryId, tajoClient.getConf(), desc);
 
-                ResultSetMetaData rsmd = res.getMetaData();
-                resultSize = desc.getStats().getNumBytes();
-                LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
-
-                int numOfColumns = rsmd.getColumnCount();
-                for(int i = 0; i < numOfColumns; i++) {
-                  columnNames.add(rsmd.getColumnName(i + 1));
-                }
-                queryResult = new ArrayList<List<Object>>();
-
-                if(sizeLimit < resultSize) {
-                    numOfRows = (long)((float)(desc.getStats().getNumRows()) * ((float)sizeLimit / (float)resultSize));
-                } else {
-                    numOfRows = desc.getStats().getNumRows();
-                }
-                int rowCount = 0;
-                boolean hasMoreData = false;
-                while (res.next()) {
-                  if(rowCount > numOfRows) {
-                    hasMoreData = true;
-                    break;
-                  }
-                  List<Object> row = new ArrayList<Object>();
-                  for(int i = 0; i < numOfColumns; i++) {
-                    row.add(res.getObject(i + 1).toString());
-                  }
-                  queryResult.add(row);
-                  rowCount++;
+                MakeResultText(res, desc);
 
-                }
               } finally {
                 if (res != null) {
                   res.close();
                 }
                 progress.set(100);
               }
-            } else {
-              error = new Exception(queryId + " no result");
+            } else { // CTAS or INSERT (OVERWRITE) INTO
+              progress.set(100);
+              try {
+                tajoClient.closeQuery(queryId);
+              } catch (Exception e) {
+                LOG.warn(e);
+              }
             }
           }
         }
@@ -390,5 +420,39 @@ public class QueryExecutorServlet extends HttpServlet {
         error = e;
       }
     }
+
+    private void MakeResultText(ResultSet res, TableDesc desc) throws SQLException {
+      ResultSetMetaData rsmd = res.getMetaData();
+      resultRows = desc.getStats() == null ? 0 : desc.getStats().getNumRows();
+      if (resultRows == 0) {
+        resultRows = 1000;
+      }
+      LOG.info("Tajo Query Result: " + desc.getPath() + "\n");
+
+      int numOfColumns = rsmd.getColumnCount();
+      for(int i = 0; i < numOfColumns; i++) {
+        columnNames.add(rsmd.getColumnName(i + 1));
+      }
+      queryResult = new ArrayList<List<Object>>();
+
+      if(sizeLimit < resultRows) {
+        numOfRows = (long)((float)(resultRows) * ((float)sizeLimit / (float) resultRows));
+      } else {
+        numOfRows = resultRows;
+      }
+
+      int rowCount = 0;
+      while (res.next()) {
+        if(rowCount > numOfRows) {
+          break;
+        }
+        List<Object> row = new ArrayList<Object>();
+        for(int i = 0; i < numOfColumns; i++) {
+          row.add(res.getObject(i + 1).toString());
+        }
+        queryResult.add(row);
+        rowCount++;
+      }
+    }
   }
 }