You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/08/03 10:18:00 UTC

tajo git commit: TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho)

Repository: tajo
Updated Branches:
  refs/heads/master 1be0e66b2 -> d8ce56263


TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho)

Closes #672


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d8ce5626
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d8ce5626
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d8ce5626

Branch: refs/heads/master
Commit: d8ce5626342e64bf02be92905e8ed2a147ef85ca
Parents: 1be0e66
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Aug 3 17:17:04 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Aug 3 17:17:04 2015 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../org/apache/tajo/master/QueryManager.java    | 52 ++++++------
 .../apache/tajo/util/history/HistoryReader.java | 87 ++++++++++----------
 .../src/main/resources/webapps/admin/query.jsp  |  6 +-
 .../util/history/TestHistoryWriterReader.java   | 64 ++++++++++++++
 5 files changed, 137 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 19dd980..3deac6d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -208,6 +208,8 @@ Release 0.11.0 - unreleased
 
   BUG FIXES
 
+    TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho)
+
     TAJO-1731: With a task failure, query processing is hanged after first retry. 
     (jihoon)
 

http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 8be298e..8838986 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.tajo.master;
 
-import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.apache.commons.collections.map.LRUMap;
@@ -126,9 +125,7 @@ public class QueryManager extends CompositeService {
     }
 
     try {
-      synchronized (this) {
-        result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory());
-      }
+      result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory());
       return result;
     } catch (Throwable e) {
       LOG.error(e, e);
@@ -137,34 +134,35 @@ public class QueryManager extends CompositeService {
   }
 
   /**
-   * Get query history in cache or persistent storage
+   * Get desc ordered query histories in cache or persistent storage
+   * @param page index of page
+   * @param size size of page
    */
-  public Collection<QueryInfo> getFinishedQueries(int page, int size) {
-    TreeSet<QueryInfo> result = Sets.newTreeSet();
-    if(page <= 0 || size <= 0) {
-      return result;
+  public List<QueryInfo> getFinishedQueries(int page, int size) {
+    if (page <= 0 || size <= 0) {
+      return Collections.EMPTY_LIST;
     }
 
-    List<QueryInfo> cacheList = Lists.newArrayList();
-    synchronized (historyCache) {
+    if (page * size <= historyCache.size()) {
+      Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder());
       // request size fits in cache
-      if (page == 1 && size <= historyCache.size()) {
-        cacheList.addAll(historyCache.values());
+      synchronized (historyCache) {
+        result.addAll(historyCache.values());
       }
-    }
-
-    if (cacheList.size() > 0) {
-      result.addAll(cacheList.subList(0, size));
-      return result;
-    }
-
-    try {
-      synchronized (this) {
+      int fromIndex = (page - 1) * size;
+      return new LinkedList<QueryInfo>(result).subList(fromIndex, fromIndex + size);
+    } else {
+      try {
         return this.masterContext.getHistoryReader().getQueriesInHistory(page, size);
+      } catch (Throwable e) {
+        LOG.error(e, e);
+        Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder());
+        // request size fits in cache
+        synchronized (historyCache) {
+          result.addAll(historyCache.values());
+        }
+        return new LinkedList<QueryInfo>(result);
       }
-    } catch (Throwable e) {
-      LOG.error(e, e);
-      return result;
     }
   }
 
@@ -175,9 +173,7 @@ public class QueryManager extends CompositeService {
         queryInfo = (QueryInfo) historyCache.get(queryId);
       }
       if (queryInfo == null) {
-        synchronized (this) {
-          queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId);
-        }
+        queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId);
       }
       return queryInfo;
     } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 27d823e..66077cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.tajo.util.history;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -62,41 +64,37 @@ public class HistoryReader {
     return getQueriesInHistory(-1, Integer.MAX_VALUE);
   }
 
+  /**
+   * Get desc ordered query histories in persistent storage
+   * @param page index of page
+   * @param size size of page
+   */
   public List<QueryInfo> getQueriesInHistory(int page, int size) throws IOException {
-    List<QueryInfo> queryList = getQueryInfoInHistory(page, size, null);
-    if (queryList.size() > size) {
-      queryList = queryList.subList(0, size);
-    }
-
-    Collections.sort(queryList, new Comparator<QueryInfo>() {
-      @Override
-      public int compare(QueryInfo query1, QueryInfo query2) {
-        return query2.compareTo(query1);
-      }
-    });
-
-    return queryList;
+    return findQueryInfoInStorage(page, size, null);
   }
 
-  private List<QueryInfo> getQueryInfoInHistory(int page, int size, @Nullable QueryId queryId) throws IOException {
-    List<QueryInfo> queryInfos = new ArrayList<QueryInfo>();
+  private synchronized List<QueryInfo> findQueryInfoInStorage(int page, int size, @Nullable QueryId queryId)
+      throws IOException {
+    List<QueryInfo> result = Lists.newLinkedList();
 
     FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf);
     try {
       if (!fs.exists(historyParentPath)) {
-        return queryInfos;
+        return result;
       }
     } catch (Throwable e) {
-      return queryInfos;
+      return result;
     }
 
     FileStatus[] files = fs.listStatus(historyParentPath);
     if (files == null || files.length == 0) {
-      return queryInfos;
+      return result;
     }
 
-    int startIndex = page < 1 ? page : (page - 1) * size; // set index to last index of previous page
+    Set<QueryInfo> queryInfos = Sets.newTreeSet(Collections.reverseOrder());
+    int startIndex = page < 1 ? page : ((page - 1) * size) + 1;
     int currentIndex = 0;
+    int skipSize = 0;
 
     ArrayUtils.reverse(files);
     for (FileStatus eachDateFile : files) {
@@ -118,55 +116,60 @@ public class HistoryReader {
         }
 
         FSDataInputStream in = null;
-        long totalLength = 0;
 
+        List<String> jsonList = Lists.newArrayList();
         try {
           in = fs.open(path);
 
-          while (totalLength < eachFile.getLen()) {
+          //If history file does not close, FileStatus.getLen() are not being updated
+          //So, this code block should check the EOFException
+          while (true) {
             int length = in.readInt();
-            totalLength += 4;
-
-            currentIndex++;
-            //skip previous page
-            if (startIndex >= currentIndex) {
-              totalLength += in.skipBytes(length);
-              continue;
-            }
 
             byte[] buf = new byte[length];
             in.readFully(buf, 0, length);
-            totalLength += length;
 
-            String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET);
-            QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson);
+            jsonList.add(new String(buf, 0, length, Bytes.UTF8_CHARSET));
+            currentIndex++;
+          }
+        } catch (EOFException eof) {
+        } catch (Throwable e) {
+          LOG.warn("Reading error:" + path + ", " + e.getMessage());
+        } finally {
+          IOUtils.cleanup(LOG, in);
+        }
 
+        //skip previous page
+        if (startIndex > currentIndex) {
+          skipSize += jsonList.size();
+        } else {
+          for (String json : jsonList) {
+            QueryInfo queryInfo = QueryInfo.fromJson(json);
             if (queryId != null) {
               if (queryInfo.getQueryId().equals(queryId)) {
-                queryInfos.add(queryInfo);
-                return queryInfos;
+                result.add(queryInfo);
+                return result;
               }
             } else {
               queryInfos.add(queryInfo);
             }
           }
-        } catch (Throwable e) {
-          LOG.warn("Reading error:" + path + ", " + e.getMessage());
-        } finally {
-          IOUtils.cleanup(LOG, in);
         }
 
-        if (queryInfos.size() >= size) {
-          return queryInfos;
+        if (currentIndex - (startIndex - 1) >= size) {
+          result.addAll(queryInfos);
+          int fromIndex = (startIndex - 1) - skipSize;
+          return result.subList(fromIndex, fromIndex + size);
         }
       }
     }
 
-    return queryInfos;
+    result.addAll(queryInfos);
+    return result;
   }
 
   public QueryInfo getQueryByQueryId(QueryId queryId) throws IOException {
-    List<QueryInfo> queryInfoList = getQueryInfoInHistory(-1, Integer.MAX_VALUE, queryId);
+    List<QueryInfo> queryInfoList = findQueryInfoInStorage(-1, Integer.MAX_VALUE, queryId);
     if (queryInfoList.size() > 0) {
       return queryInfoList.get(0);
     } else {

http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 7422a03..bcf7766 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -59,10 +59,8 @@
     }
   }
 
-  List<QueryInfo> finishedQueries = new ArrayList<QueryInfo>(
-          master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize));
-  Collections.sort(finishedQueries, java.util.Collections.reverseOrder());
-
+  List<QueryInfo> finishedQueries =
+          master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize);
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
 
   Map<Integer, NodeStatus> workers = master.getContext().getResourceManager().getNodes();

http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index aee418c..3d2578c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.*;
 public class TestHistoryWriterReader extends QueryTestCaseBase {
   public static final String HISTORY_DIR = "/tmp/tajo-test-history";
   TajoConf tajoConf;
+
   @Before
   public void setUp() throws Exception {
     tajoConf = new TajoConf(testingCluster.getConfiguration());
@@ -119,6 +120,69 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
   }
 
   @Test
+  public void testQueryInfoPagination() throws Exception {
+    HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+    try {
+      writer.init(tajoConf);
+      writer.start();
+
+      long startTime = System.currentTimeMillis();
+      int testSize = 10;
+      QueryInfo queryInfo;
+
+      for (int i = 1; i < testSize + 1; i++) {
+        queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i));
+        queryInfo.setStartTime(startTime);
+        queryInfo.setProgress(1.0f);
+        queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED);
+
+        if (testSize == i) {
+          writer.appendAndSync(queryInfo);
+        } else {
+          writer.appendHistory(queryInfo);
+        }
+      }
+
+      SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+      Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+      FileSystem fs = path.getFileSystem(tajoConf);
+      Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+      FileStatus[] histFiles = fs.listStatus(parentPath);
+      assertNotNull(histFiles);
+      assertEquals(1, histFiles.length);
+      assertTrue(histFiles[0].isFile());
+      assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+      HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+      List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize);
+      assertNotNull(queryInfos);
+      assertEquals(testSize, queryInfos.size());
+
+      // the pagination api returns a descending ordered list
+      for (int i = 0; i < testSize; i++) {
+        assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq());
+      }
+
+      int pages = 5;
+      int pageSize = testSize / pages;
+      int expectIdSequence = testSize;
+      //min startIndex of page is 1
+      for (int i = 1; i < pages + 1; i++) {
+        queryInfos = reader.getQueriesInHistory(i, pageSize);
+        assertNotNull(queryInfos);
+        assertEquals(pageSize, queryInfos.size());
+
+        for (QueryInfo qInfo : queryInfos) {
+          assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq());
+        }
+      }
+    } finally {
+      writer.stop();
+    }
+  }
+
+  @Test
   public void testQueryHistoryReadAndWrite() throws Exception {
     HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
     writer.init(tajoConf);