You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tajo.apache.org by jh...@apache.org on 2016/05/10 02:32:08 UTC

tajo git commit: TAJO-2143: Fix race condition in task history writer.

Repository: tajo
Updated Branches:
  refs/heads/master 7f9c4c950 -> 34c5d5f24


TAJO-2143: Fix race condition in task history writer.

Closes #1013


Project: http://git-wip-us.apache.org/repos/asf/tajo/repo
Commit: http://git-wip-us.apache.org/repos/asf/tajo/commit/34c5d5f2
Tree: http://git-wip-us.apache.org/repos/asf/tajo/tree/34c5d5f2
Diff: http://git-wip-us.apache.org/repos/asf/tajo/diff/34c5d5f2

Branch: refs/heads/master
Commit: 34c5d5f24c48683f59925cafbec04a832e532753
Parents: 7f9c4c9
Author: Jinho Kim <jh...@apache.org>
Authored: Tue May 10 11:31:26 2016 +0900
Committer: Jinho Kim <jh...@apache.org>
Committed: Tue May 10 11:31:26 2016 +0900

----------------------------------------------------------------------
 CHANGES                                         |  2 +
 .../util/history/TestHistoryWriterReader.java   |  6 ++-
 .../apache/tajo/util/history/HistoryWriter.java | 41 +++++++++++++-------
 3 files changed, 35 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/CHANGES
----------------------------------------------------------------------
diff --git a/CHANGES b/CHANGES
index ad9ca05..c653d99 100644
--- a/CHANGES
+++ b/CHANGES
@@ -142,6 +142,8 @@ Release 0.12.0 - unreleased
 
   BUG FIXES
 
+    TAJO-2143: Fix race condition in task history writer. (jinho)
+
     TAJO-2140: TajoInternalError does not contains reason stack trace. (jinho)
 
     TAJO-2135: Invalid join result when join key columns contain nulls. (jihoon)

http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
----------------------------------------------------------------------
diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
index 8b33d02..132def7 100644
--- a/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
+++ b/tajo-core-tests/src/test/java/org/apache/tajo/util/history/TestHistoryWriterReader.java
@@ -39,6 +39,7 @@ import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.*;
 
@@ -283,7 +284,10 @@ public class TestHistoryWriterReader extends QueryTestCaseBase {
       TaskAttemptId id2 = TajoIdUtils.parseTaskAttemptId("ta_1412326813565_0001_000001_000002_00");
       org.apache.tajo.worker.TaskHistory taskHistory2 = new org.apache.tajo.worker.TaskHistory(
           id2, TaskAttemptState.TA_SUCCEEDED, 1.0f, startTime, System.currentTimeMillis() - 500, tableStats);
-      writer.appendAndSync(taskHistory2);
+      writer.appendHistory(taskHistory2);
+
+      HistoryWriter.WriterFuture future = writer.flushTaskHistories();
+      future.get(10, TimeUnit.SECONDS);
 
       SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH");
       String startDate = df.format(new Date(startTime));

http://git-wip-us.apache.org/repos/asf/tajo/blob/34c5d5f2/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
----------------------------------------------------------------------
diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
index bc62cac..668940c 100644
--- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
+++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java
@@ -175,16 +175,30 @@ public class HistoryWriter extends AbstractService {
     }
   }
 
-  /* Flushing the buffer */
-  public void flushTaskHistories() {
-    if (historyQueue.size() > 0) {
-      synchronized (writerThread) {
-        writerThread.needTaskFlush.set(true);
-        writerThread.notifyAll();
+  /**
+   *  flush all task histories
+   */
+  public WriterFuture flushTaskHistories() {
+
+    WriterFuture<WriterHolder> future = new WriterFuture<WriterHolder>(null) {
+      public void done(WriterHolder holder) {
+        for(WriterHolder writerHolder : taskWriters.values()) {
+          try {
+            writerHolder.flush();
+          } catch (IOException e) {
+            super.failed(e);
+          }
+
+        }
+        super.done(null);
       }
-    } else {
-      writerThread.flushTaskHistories();
+    };
+
+    historyQueue.add(future);
+    synchronized (writerThread) {
+      writerThread.notifyAll();
     }
+    return future;
   }
 
   public static FileSystem getNonCrcFileSystem(Path path, Configuration conf) throws IOException {
@@ -225,7 +239,6 @@ public class HistoryWriter extends AbstractService {
   }
 
   class WriterThread extends Thread {
-    private AtomicBoolean needTaskFlush = new AtomicBoolean(false);
 
     public void run() {
       LOG.info("HistoryWriter_" + processName + " started.");
@@ -303,7 +316,11 @@ public class HistoryWriter extends AbstractService {
 
       for (WriterFuture<WriterHolder> future : histories) {
         History history = future.getHistory();
-        switch (history.getHistoryType()) {
+
+        if(history == null) {
+          future.done(null);
+        } else {
+          switch (history.getHistoryType()) {
           case TASK:
             try {
               future.done(writeTaskHistory((TaskHistory) history));
@@ -334,12 +351,10 @@ public class HistoryWriter extends AbstractService {
             break;
           default:
             LOG.warn("Wrong history type: " + history.getHistoryType());
+          }
         }
       }
 
-      if(needTaskFlush.getAndSet(false)){
-        flushTaskHistories();
-      }
       return histories;
     }