You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/05/10 02:32:08 UTC
tajo git commit: TAJO-2143: Fix race condition in task history writer.
Repository: tajo
Updated Branches:
refs/heads/master 7f9c4c950 -> 34c5d5f24
TAJO-2143: Fix race condition in task history writer.
Closes #1013
Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/34c5d5f2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/34c5d5f2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/34c5d5f2
Branch: refs/heads/master
Commit: 34c5d5f24c48683f59925cafbec04a832e532753
Parents: 7f9c4c9
Author: Jinho Kim <jh...@apache.org>
Authored: Tue May 10 11:31:26 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue May 10 11:31:26 2016 +0900
----------------------------------------------------------------------
CHANGES | 2 +
.../util/history/TestHistoryWriterReader.java | 6 ++-
.../apache/tajo/util/history/HistoryWriter.java | 41 +++++++++++++-------
3 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ad9ca05..c653d99 100644
--- a/CHANGES
+++ b/CHANGES
@@ -142,6 +142,8 @@ Release 0.12.0 - unreleased
BUG FIXES
+ TAJO-2143: Fix race condition in task history writer. (jinho)
+
TAJO-2140: TajoInternalError does not contains reason stack trace. (jinho)
TAJO-2135: Invalid join result when join key columns contain nulls. (jihoon)
http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index 8b33d02..132def7 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -39,6 +39,7 @@ import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import static org.junit.Assert.*;
@@ -283,7 +284,10 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
- writer.appendAndSync(taskHistory2);
+ writer.appendHistory(taskHistory2);
+
+ HistoryWriter.WriterFuture future = writer.flushTaskHistories();
+ future.get(10, TimeUnit.SECONDS);
SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
String startDate = df.format(new Date(startTime));
http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index bc62cac..668940c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -175,16 +175,30 @@ public class HistoryWriter extends AbstractService {
}
}
- /* Flushing the buffer */
- public void flushTaskHistories() {
- if (historyQueue.size() > 0) {
- synchronized (writerThread) {
- writerThread.needTaskFlush.set(true);
- writerThread.notifyAll();
+ /**
+ * flush all task histories
+ */
+ public WriterFuture flushTaskHistories() {
+
+ WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(null) {
+ public void done(WriterHolder holder) {
+ for(WriterHolder writerHolder : taskWriters.values()) {
+ try {
+ writerHolder.flush();
+ } catch (IOException e) {
+ super.failed(e);
+ }
+
+ }
+ super.done(null);
}
- } else {
- writerThread.flushTaskHistories();
+ };
+
+ historyQueue.add(future);
+ synchronized (writerThread) {
+ writerThread.notifyAll();
}
+ return future;
}
public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
@@ -225,7 +239,6 @@ public class HistoryWriter extends AbstractService {
}
class WriterThread extends Thread {
- private AtomicBoolean needTaskFlush = new AtomicBoolean(false);
public void run() {
LOG.info("HistoryWriter_" + processName + " started.");
@@ -303,7 +316,11 @@ public class HistoryWriter extends AbstractService {
for (WriterFuture<WriterHolder> future : histories) {
History history = future.getHistory();
- switch (history.getHistoryType()) {
+
+ if(history == null) {
+ future.done(null);
+ } else {
+ switch (history.getHistoryType()) {
case TASK:
try {
future.done(writeTaskHistory((TaskHistory) history));
@@ -334,12 +351,10 @@ public class HistoryWriter extends AbstractService {
break;
default:
LOG.warn("Wrong history type: " + history.getHistoryType());
+ }
}
}
- if(needTaskFlush.getAndSet(false)){
- flushTaskHistories();
- }
return histories;
}