You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2015/08/03 10:18:00 UTC
tajo git commit: TAJO-1733: Finished query occasionally does not
appear in Web-UI. (jinho)
Repository: tajo
Updated Branches:
refs/heads/master 1be0e66b2 -> d8ce56263
TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho)
Closes #672
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/d8ce5626
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/d8ce5626
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/d8ce5626
Branch: refs/heads/master
Commit: d8ce5626342e64bf02be92905e8ed2a147ef85ca
Parents: 1be0e66
Author: Jinho Kim <jh...@apache.org>
Authored: Mon Aug 3 17:17:04 2015 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Mon Aug 3 17:17:04 2015 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../org/apache/tajo/master/QueryManager.java | 52 ++++++------
.../apache/tajo/util/history/HistoryReader.java | 87 ++++++++++----------
.../src/main/resources/webapps/admin/query.jsp | 6 +-
.../util/history/TestHistoryWriterReader.java | 64 ++++++++++++++
5 files changed, 137 insertions(+), 74 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index 19dd980..3deac6d 100644
--- a/CHANGES
+++ b/CHANGES
@@ -208,6 +208,8 @@ Release 0.11.0 - unreleased
BUG FIXES
+ TAJO-1733: Finished query occasionally does not appear in Web-UI. (jinho)
+
TAJO-1731: With a task failure, query processing is hanged after first retry.
(jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
index 8be298e..8838986 100644
--- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
+++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java
@@ -18,7 +18,6 @@
package org.apache.tajo.master;
-import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.collections.map.LRUMap;
@@ -126,9 +125,7 @@ public class QueryManager extends CompositeService {
}
try {
- synchronized (this) {
- result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory());
- }
+ result.addAll(this.masterContext.getHistoryReader().getQueriesInHistory());
return result;
} catch (Throwable e) {
LOG.error(e, e);
@@ -137,34 +134,35 @@ public class QueryManager extends CompositeService {
}
/**
- * Get query history in cache or persistent storage
+ * Get desc ordered query histories in cache or persistent storage
+ * @param page index of page
+ * @param size size of page
*/
- public Collection<QueryInfo> getFinishedQueries(int page, int size) {
- TreeSet<QueryInfo> result = Sets.newTreeSet();
- if(page <= 0 || size <= 0) {
- return result;
+ public List<QueryInfo> getFinishedQueries(int page, int size) {
+ if (page <= 0 || size <= 0) {
+ return Collections.EMPTY_LIST;
}
- List<QueryInfo> cacheList = Lists.newArrayList();
- synchronized (historyCache) {
+ if (page * size <= historyCache.size()) {
+ Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder());
// request size fits in cache
- if (page == 1 && size <= historyCache.size()) {
- cacheList.addAll(historyCache.values());
+ synchronized (historyCache) {
+ result.addAll(historyCache.values());
}
- }
-
- if (cacheList.size() > 0) {
- result.addAll(cacheList.subList(0, size));
- return result;
- }
-
- try {
- synchronized (this) {
+ int fromIndex = (page - 1) * size;
+ return new LinkedList<QueryInfo>(result).subList(fromIndex, fromIndex + size);
+ } else {
+ try {
return this.masterContext.getHistoryReader().getQueriesInHistory(page, size);
+ } catch (Throwable e) {
+ LOG.error(e, e);
+ Set<QueryInfo> result = Sets.newTreeSet(Collections.reverseOrder());
+ // request size fits in cache
+ synchronized (historyCache) {
+ result.addAll(historyCache.values());
+ }
+ return new LinkedList<QueryInfo>(result);
}
- } catch (Throwable e) {
- LOG.error(e, e);
- return result;
}
}
@@ -175,9 +173,7 @@ public class QueryManager extends CompositeService {
queryInfo = (QueryInfo) historyCache.get(queryId);
}
if (queryInfo == null) {
- synchronized (this) {
- queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId);
- }
+ queryInfo = this.masterContext.getHistoryReader().getQueryByQueryId(queryId);
}
return queryInfo;
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
index 27d823e..66077cf 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java
@@ -18,6 +18,8 @@
package org.apache.tajo.util.history;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.lang.ArrayUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -62,41 +64,37 @@ public class HistoryReader {
return getQueriesInHistory(-1, Integer.MAX_VALUE);
}
+ /**
+ * Get desc ordered query histories in persistent storage
+ * @param page index of page
+ * @param size size of page
+ */
public List<QueryInfo> getQueriesInHistory(int page, int size) throws IOException {
- List<QueryInfo> queryList = getQueryInfoInHistory(page, size, null);
- if (queryList.size() > size) {
- queryList = queryList.subList(0, size);
- }
-
- Collections.sort(queryList, new Comparator<QueryInfo>() {
- @Override
- public int compare(QueryInfo query1, QueryInfo query2) {
- return query2.compareTo(query1);
- }
- });
-
- return queryList;
+ return findQueryInfoInStorage(page, size, null);
}
- private List<QueryInfo> getQueryInfoInHistory(int page, int size, @Nullable QueryId queryId) throws IOException {
- List<QueryInfo> queryInfos = new ArrayList<QueryInfo>();
+ private synchronized List<QueryInfo> findQueryInfoInStorage(int page, int size, @Nullable QueryId queryId)
+ throws IOException {
+ List<QueryInfo> result = Lists.newLinkedList();
FileSystem fs = HistoryWriter.getNonCrcFileSystem(historyParentPath, tajoConf);
try {
if (!fs.exists(historyParentPath)) {
- return queryInfos;
+ return result;
}
} catch (Throwable e) {
- return queryInfos;
+ return result;
}
FileStatus[] files = fs.listStatus(historyParentPath);
if (files == null || files.length == 0) {
- return queryInfos;
+ return result;
}
- int startIndex = page < 1 ? page : (page - 1) * size; // set index to last index of previous page
+ Set<QueryInfo> queryInfos = Sets.newTreeSet(Collections.reverseOrder());
+ int startIndex = page < 1 ? page : ((page - 1) * size) + 1;
int currentIndex = 0;
+ int skipSize = 0;
ArrayUtils.reverse(files);
for (FileStatus eachDateFile : files) {
@@ -118,55 +116,60 @@ public class HistoryReader {
}
FSDataInputStream in = null;
- long totalLength = 0;
+ List<String> jsonList = Lists.newArrayList();
try {
in = fs.open(path);
- while (totalLength < eachFile.getLen()) {
+ //If history file does not close, FileStatus.getLen() are not being updated
+ //So, this code block should check the EOFException
+ while (true) {
int length = in.readInt();
- totalLength += 4;
-
- currentIndex++;
- //skip previous page
- if (startIndex >= currentIndex) {
- totalLength += in.skipBytes(length);
- continue;
- }
byte[] buf = new byte[length];
in.readFully(buf, 0, length);
- totalLength += length;
- String queryInfoJson = new String(buf, 0, length, Bytes.UTF8_CHARSET);
- QueryInfo queryInfo = QueryInfo.fromJson(queryInfoJson);
+ jsonList.add(new String(buf, 0, length, Bytes.UTF8_CHARSET));
+ currentIndex++;
+ }
+ } catch (EOFException eof) {
+ } catch (Throwable e) {
+ LOG.warn("Reading error:" + path + ", " + e.getMessage());
+ } finally {
+ IOUtils.cleanup(LOG, in);
+ }
+ //skip previous page
+ if (startIndex > currentIndex) {
+ skipSize += jsonList.size();
+ } else {
+ for (String json : jsonList) {
+ QueryInfo queryInfo = QueryInfo.fromJson(json);
if (queryId != null) {
if (queryInfo.getQueryId().equals(queryId)) {
- queryInfos.add(queryInfo);
- return queryInfos;
+ result.add(queryInfo);
+ return result;
}
} else {
queryInfos.add(queryInfo);
}
}
- } catch (Throwable e) {
- LOG.warn("Reading error:" + path + ", " + e.getMessage());
- } finally {
- IOUtils.cleanup(LOG, in);
}
- if (queryInfos.size() >= size) {
- return queryInfos;
+ if (currentIndex - (startIndex - 1) >= size) {
+ result.addAll(queryInfos);
+ int fromIndex = (startIndex - 1) - skipSize;
+ return result.subList(fromIndex, fromIndex + size);
}
}
}
- return queryInfos;
+ result.addAll(queryInfos);
+ return result;
}
public QueryInfo getQueryByQueryId(QueryId queryId) throws IOException {
- List<QueryInfo> queryInfoList = getQueryInfoInHistory(-1, Integer.MAX_VALUE, queryId);
+ List<QueryInfo> queryInfoList = findQueryInfoInStorage(-1, Integer.MAX_VALUE, queryId);
if (queryInfoList.size() > 0) {
return queryInfoList.get(0);
} else {
http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/main/resources/webapps/admin/query.jsp
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp
index 7422a03..bcf7766 100644
--- a/tajo-core/src/main/resources/webapps/admin/query.jsp
+++ b/tajo-core/src/main/resources/webapps/admin/query.jsp
@@ -59,10 +59,8 @@
}
}
- List<QueryInfo> finishedQueries = new ArrayList<QueryInfo>(
- master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize));
- Collections.sort(finishedQueries, java.util.Collections.reverseOrder());
-
+ List<QueryInfo> finishedQueries =
+ master.getContext().getQueryJobManager().getFinishedQueries(currentPage, pageSize);
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
Map<Integer, NodeStatus> workers = master.getContext().getResourceManager().getNodes();
http://git-wip-us.apache.org/repos/asf/tajo/blob/d8ce5626/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index aee418c..3d2578c 100644
--- a/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -45,6 +45,7 @@ import static org.junit.Assert.*;
public class TestHistoryWriterReader extends QueryTestCaseBase {
public static final String HISTORY_DIR = "/tmp/tajo-test-history";
TajoConf tajoConf;
+
@Before
public void setUp() throws Exception {
tajoConf = new TajoConf(testingCluster.getConfiguration());
@@ -119,6 +120,69 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
}
@Test
+ public void testQueryInfoPagination() throws Exception {
+ HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
+ try {
+ writer.init(tajoConf);
+ writer.start();
+
+ long startTime = System.currentTimeMillis();
+ int testSize = 10;
+ QueryInfo queryInfo;
+
+ for (int i = 1; i < testSize + 1; i++) {
+ queryInfo = new QueryInfo(QueryIdFactory.newQueryId(startTime, i));
+ queryInfo.setStartTime(startTime);
+ queryInfo.setProgress(1.0f);
+ queryInfo.setQueryState(QueryState.QUERY_SUCCEEDED);
+
+ if (testSize == i) {
+ writer.appendAndSync(queryInfo);
+ } else {
+ writer.appendHistory(queryInfo);
+ }
+ }
+
+ SimpleDateFormat df = new SimpleDateFormat("yyyyMMdd");
+ Path path = new Path(tajoConf.getVar(ConfVars.HISTORY_QUERY_DIR));
+
+ FileSystem fs = path.getFileSystem(tajoConf);
+ Path parentPath = new Path(path, df.format(startTime) + "/query-list");
+ FileStatus[] histFiles = fs.listStatus(parentPath);
+ assertNotNull(histFiles);
+ assertEquals(1, histFiles.length);
+ assertTrue(histFiles[0].isFile());
+ assertTrue(histFiles[0].getPath().getName().endsWith(".hist"));
+
+ HistoryReader reader = new HistoryReader("127.0.0.1:28090", tajoConf);
+ List<QueryInfo> queryInfos = reader.getQueriesInHistory(1, testSize);
+ assertNotNull(queryInfos);
+ assertEquals(testSize, queryInfos.size());
+
+ // the pagination api returns a descending ordered list
+ for (int i = 0; i < testSize; i++) {
+ assertEquals(testSize - i, queryInfos.get(i).getQueryId().getSeq());
+ }
+
+ int pages = 5;
+ int pageSize = testSize / pages;
+ int expectIdSequence = testSize;
+ //min startIndex of page is 1
+ for (int i = 1; i < pages + 1; i++) {
+ queryInfos = reader.getQueriesInHistory(i, pageSize);
+ assertNotNull(queryInfos);
+ assertEquals(pageSize, queryInfos.size());
+
+ for (QueryInfo qInfo : queryInfos) {
+ assertEquals(expectIdSequence--, qInfo.getQueryId().getSeq());
+ }
+ }
+ } finally {
+ writer.stop();
+ }
+ }
+
+ @Test
public void testQueryHistoryReadAndWrite() throws Exception {
HistoryWriter writer = new HistoryWriter("127.0.0.1:28090", true);
writer.init(tajoConf);