You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/08/13 17:59:03 UTC

aurora git commit: DbStorage: avoid flushing for reentrant writes, remove extra @Transactional.

Repository: aurora
Updated Branches:
  refs/heads/master cbc42c484 -> da48ad20b


DbStorage: avoid flushing for reentrant writes, remove extra @Transactional.

Bugs closed: AURORA-1395

Reviewed at https://reviews.apache.org/r/37141/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/da48ad20
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/da48ad20
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/da48ad20

Branch: refs/heads/master
Commit: da48ad20b07ba800911db85e5f65c4112fe18415
Parents: cbc42c4
Author: Bill Farner <wf...@apache.org>
Authored: Thu Aug 13 11:58:48 2015 -0400
Committer: Bill Farner <wf...@apache.org>
Committed: Thu Aug 13 11:58:48 2015 -0400

----------------------------------------------------------------------
 .../aurora/scheduler/storage/db/DbStorage.java  | 32 +++++++++---
 .../scheduler/storage/db/DbStorageTest.java     | 51 ++++++++++++++++++++
 2 files changed, 75 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/da48ad20/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index aac62e2..cf54a3f 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -140,14 +140,29 @@ class DbStorage extends AbstractIdleService implements Storage {
     }
   }
 
+  private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() {
+    @Override
+    protected Boolean initialValue() {
+      return false;
+    }
+  };
+
+  @Transactional
+  <T, E extends Exception> T transactionedWrite(MutateWork<T, E> work) throws E {
+    return work.apply(storeProvider);
+  }
+
   @Timed("db_storage_write_operation")
   @Override
-  @Transactional
   public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
-    T result;
-    try (SqlSession session = sessionFactory.openSession(false)) {
-      result = work.apply(storeProvider);
-      session.commit();
+    // Only flush for the top-level write() call when calls are reentrant.
+    boolean shouldFlush = !inTransaction.get();
+    if (shouldFlush) {
+      inTransaction.set(true);
+    }
+
+    try {
+      return transactionedWrite(work);
     } catch (PersistenceException e) {
       throw new StorageException(e.getMessage(), e);
     } finally {
@@ -157,10 +172,11 @@ class DbStorage extends AbstractIdleService implements Storage {
       // introduction of DbStorage, but should be revisited.
       // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
       // successful.
-      postTransactionWork.flush();
+      if (shouldFlush) {
+        postTransactionWork.flush();
+        inTransaction.set(false);
+      }
     }
-
-    return result;
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/aurora/blob/da48ad20/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 3b05db9..9725314 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -13,6 +13,8 @@
  */
 package org.apache.aurora.scheduler.storage.db;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import com.twitter.common.testing.easymock.EasyMockTest;
 
 import org.apache.aurora.scheduler.async.FlushableWorkQueue;
@@ -22,6 +24,7 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore;
 import org.apache.aurora.scheduler.storage.LockStore;
 import org.apache.aurora.scheduler.storage.QuotaStore;
 import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
 import org.apache.aurora.scheduler.storage.Storage.MutateWork;
 import org.apache.aurora.scheduler.storage.Storage.StorageException;
 import org.apache.aurora.scheduler.storage.Storage.Work;
@@ -30,11 +33,14 @@ import org.apache.ibatis.exceptions.PersistenceException;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
 import org.easymock.EasyMock;
+import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
 
 import static org.easymock.EasyMock.expect;
+import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 
 public class DbStorageTest extends EasyMockTest {
 
@@ -113,4 +119,49 @@ public class DbStorageTest extends EasyMockTest {
 
     storage.bulkLoad(writeWork);
   }
+
+  @Test
+  public void testFlushWithReentrantWrites() {
+    final AtomicBoolean flushed = new AtomicBoolean(false);
+    flusher.flush();
+    expectLastCall().andAnswer(new IAnswer<Void>() {
+      @Override
+      public Void answer() {
+        flushed.set(true);
+        return null;
+      }
+    });
+
+    control.replay();
+
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        noopWrite();
+
+        // Should not have flushed yet.
+        assertFalse("flush() should not be called until outer write() completes.", flushed.get());
+      }
+    });
+  }
+
+  private void noopWrite() {
+    storage.write(new MutateWork.NoResult.Quiet() {
+      @Override
+      public void execute(MutableStoreProvider storeProvider) {
+        // No-op.
+      }
+    });
+  }
+
+  @Test
+  public void testFlushWithSeqentialWrites() {
+    flusher.flush();
+    expectLastCall().times(2);
+
+    control.replay();
+
+    noopWrite();
+    noopWrite();
+  }
 }