You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/09/15 03:29:26 UTC
aurora git commit: Make async work queue gating thread-local.
Repository: aurora
Updated Branches:
refs/heads/master 5dccf92f4 -> e658a8a0b
Make async work queue gating thread-local.
Bugs closed: AURORA-1459
Reviewed at https://reviews.apache.org/r/38336/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/e658a8a0
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/e658a8a0
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/e658a8a0
Branch: refs/heads/master
Commit: e658a8a0b71e894a03ffc81dcae981db9d68bcb4
Parents: 5dccf92
Author: Bill Farner <wf...@apache.org>
Authored: Mon Sep 14 18:29:22 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Mon Sep 14 18:29:22 2015 -0700
----------------------------------------------------------------------
config/findbugs/excludeFilter.xml | 2 +-
.../aurora/scheduler/async/AsyncModule.java | 14 +-
.../scheduler/async/FlushableWorkQueue.java | 25 ---
.../scheduler/async/GatedDelayExecutor.java | 71 --------
.../aurora/scheduler/async/GatedWorkQueue.java | 41 +++++
.../scheduler/async/GatingDelayExecutor.java | 98 +++++++++++
.../aurora/scheduler/storage/db/DbModule.java | 12 +-
.../aurora/scheduler/storage/db/DbStorage.java | 82 ++++-----
.../aurora/scheduler/app/SchedulerIT.java | 4 -
.../scheduler/async/GatedDelayExecutorTest.java | 91 ----------
.../async/GatingDelayExecutorTest.java | 170 +++++++++++++++++++
.../pruning/TaskHistoryPrunerTest.java | 33 ++--
.../scheduler/storage/db/DbStorageTest.java | 54 +++---
13 files changed, 411 insertions(+), 286 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/config/findbugs/excludeFilter.xml
----------------------------------------------------------------------
diff --git a/config/findbugs/excludeFilter.xml b/config/findbugs/excludeFilter.xml
index 7c65302..fe3f4ca 100644
--- a/config/findbugs/excludeFilter.xml
+++ b/config/findbugs/excludeFilter.xml
@@ -114,7 +114,7 @@ limitations under the License.
<Match>
<!-- False positives on a check introduced in findbugs 3.0.1 -->
<Or>
- <Class name="org.apache.aurora.scheduler.storage.db.DbStorage" />
+ <Class name="org.apache.aurora.scheduler.storage.db.DbStorage$3" />
<Class name="org.apache.aurora.scheduler.http.api.security.AuthorizeHeaderTokenTest" />
</Or>
<Bug pattern="RV_RETURN_VALUE_IGNORED_NO_SIDE_EFFECT" />
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
index 217b9c0..eccb864 100644
--- a/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/async/AsyncModule.java
@@ -75,8 +75,8 @@ public class AsyncModule extends AbstractModule {
bind(ScheduledThreadPoolExecutor.class).toInstance(afterTransaction);
bind(ScheduledExecutorService.class).toInstance(afterTransaction);
- bind(GatedDelayExecutor.class).in(Singleton.class);
- expose(GatedDelayExecutor.class);
+ bind(GatingDelayExecutor.class).in(Singleton.class);
+ expose(GatingDelayExecutor.class);
bind(RegisterGauges.class).in(Singleton.class);
expose(RegisterGauges.class);
@@ -84,9 +84,9 @@ public class AsyncModule extends AbstractModule {
});
SchedulerServicesModule.addAppStartupServiceBinding(binder()).to(RegisterGauges.class);
- bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
- bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
- bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatedDelayExecutor.class);
+ bind(Executor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
+ bind(DelayExecutor.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
+ bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).to(GatingDelayExecutor.class);
}
static class RegisterGauges extends AbstractIdleService {
@@ -101,13 +101,13 @@ public class AsyncModule extends AbstractModule {
private final StatsProvider statsProvider;
private final ScheduledThreadPoolExecutor executor;
- private final GatedDelayExecutor delayExecutor;
+ private final GatingDelayExecutor delayExecutor;
@Inject
RegisterGauges(
StatsProvider statsProvider,
ScheduledThreadPoolExecutor executor,
- GatedDelayExecutor delayExecutor) {
+ GatingDelayExecutor delayExecutor) {
this.statsProvider = requireNonNull(statsProvider);
this.executor = requireNonNull(executor);
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
deleted file mode 100644
index 11a1c2a..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/FlushableWorkQueue.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-/**
- * A work queue that only executes pending work when flushed.
- */
-public interface FlushableWorkQueue {
-
- /**
- * Makes pending work available for execution.
- */
- void flush();
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
deleted file mode 100644
index 9d4cfcf..0000000
--- a/src/main/java/org/apache/aurora/scheduler/async/GatedDelayExecutor.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.Queue;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.inject.Inject;
-
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * An executor that queues work until flushed.
- */
-class GatedDelayExecutor implements DelayExecutor, FlushableWorkQueue {
-
- private final ScheduledExecutorService executor;
- private final Queue<Runnable> queue = Lists.newLinkedList();
-
- /**
- * Creates a gated delay executor that will flush work to the provided {@code delegate}.
- *
- * @param delegate Delegate to execute work with when flushed.
- */
- @Inject
- GatedDelayExecutor(ScheduledExecutorService delegate) {
- this.executor = requireNonNull(delegate);
- }
-
- synchronized int getQueueSize() {
- return queue.size();
- }
-
- private synchronized void enqueue(Runnable work) {
- queue.add(work);
- }
-
- @Override
- public synchronized void flush() {
- for (Runnable work : Iterables.consumingIterable(queue)) {
- work.run();
- }
- }
-
- @Override
- public synchronized void execute(Runnable command) {
- enqueue(() -> executor.execute(command));
- }
-
- @Override
- public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) {
- enqueue(() -> executor.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit()));
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
new file mode 100644
index 0000000..7032271
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatedWorkQueue.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+/**
+ * A work queue that only executes pending work when flushed.
+ */
+public interface GatedWorkQueue {
+
+ /**
+ * Closes the gate on the work queue for the duration of an operation.
+ *
+ * @param operation Operation to execute while keeping the gate closed.
+ * @param <T> Operation return type.
+ * @param <E> Operation exception type.
+ * @return The value returned by the {@code operation}.
+ * @throws E Exception thrown by the {@code operation}.
+ */
+ <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E;
+
+ /**
+ * Operation prevents new items from being executed on the work queue.
+ *
+ * @param <T> Operation return type.
+ * @param <E> Operation exception type.
+ */
+ interface GatedOperation<T, E extends Exception> {
+ T doWithGateClosed() throws E;
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
new file mode 100644
index 0000000..a7240ae
--- /dev/null
+++ b/src/main/java/org/apache/aurora/scheduler/async/GatingDelayExecutor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.Queue;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.inject.Inject;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * An executor that may be temporarily gated with {@link #closeDuring(GatedOperation)}. When the
+ * executor is gated, newly-submitted work will be enqueued and executed once the gate is opened as
+ * a result of {@link #closeDuring(GatedOperation)} returning.
+ */
+class GatingDelayExecutor implements DelayExecutor, GatedWorkQueue {
+
+ private final ScheduledExecutorService gated;
+ private final Queue<Runnable> queue = Lists.newLinkedList();
+
+ /**
+ * Creates a gating delay executor that will gate work from the provided executor.
+ *
+ * @param gated Delegate to execute work with when ungated.
+ */
+ @Inject
+ GatingDelayExecutor(ScheduledExecutorService gated) {
+ this.gated = requireNonNull(gated);
+ }
+
+ private final ThreadLocal<Boolean> isOpen = new ThreadLocal<Boolean>() {
+ @Override
+ protected Boolean initialValue() {
+ return true;
+ }
+ };
+
+ @Override
+ public <T, E extends Exception> T closeDuring(GatedOperation<T, E> operation) throws E {
+ boolean startedOpen = isOpen.get();
+ isOpen.set(false);
+
+ try {
+ return operation.doWithGateClosed();
+ } finally {
+ if (startedOpen) {
+ isOpen.set(true);
+ flush();
+ }
+ }
+ }
+
+ synchronized int getQueueSize() {
+ return queue.size();
+ }
+
+ private synchronized void enqueue(Runnable work) {
+ if (isOpen.get()) {
+ work.run();
+ } else {
+ queue.add(work);
+ }
+ }
+
+ private synchronized void flush() {
+ for (Runnable work : Iterables.consumingIterable(queue)) {
+ work.run();
+ }
+ }
+
+ @Override
+ public synchronized void execute(Runnable command) {
+ enqueue(() -> gated.execute(command));
+ }
+
+ @Override
+ public synchronized void execute(Runnable work, Amount<Long, Time> minDelay) {
+ enqueue(() -> gated.schedule(work, minDelay.getValue(), minDelay.getUnit().getTimeUnit()));
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
index 6da6193..e3efbdb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbModule.java
@@ -39,7 +39,7 @@ import org.apache.aurora.common.quantity.Amount;
import org.apache.aurora.common.quantity.Time;
import org.apache.aurora.scheduler.SchedulerServicesModule;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -156,7 +156,15 @@ public final class DbModule extends PrivateModule {
new AbstractModule() {
@Override
protected void configure() {
- bind(FlushableWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(() -> { });
+ bind(GatedWorkQueue.class).annotatedWith(AsyncExecutor.class).toInstance(
+ new GatedWorkQueue() {
+ @Override
+ public <T, E extends Exception> T closeDuring(
+ GatedOperation<T, E> operation) throws E {
+
+ return operation.doWithGateClosed();
+ }
+ });
}
},
new DbModule(
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 6036570..dd7e1d3 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -29,7 +29,8 @@ import org.apache.aurora.gen.JobUpdateStatus;
import org.apache.aurora.gen.MaintenanceMode;
import org.apache.aurora.gen.ScheduleStatus;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -61,13 +62,13 @@ class DbStorage extends AbstractIdleService implements Storage {
private final SqlSessionFactory sessionFactory;
private final MutableStoreProvider storeProvider;
private final EnumValueMapper enumValueMapper;
- private final FlushableWorkQueue postTransactionWork;
+ private final GatedWorkQueue gatedWorkQueue;
@Inject
DbStorage(
SqlSessionFactory sessionFactory,
EnumValueMapper enumValueMapper,
- @AsyncExecutor FlushableWorkQueue postTransactionWork,
+ @AsyncExecutor GatedWorkQueue gatedWorkQueue,
final CronJobStore.Mutable cronJobStore,
final TaskStore.Mutable taskStore,
final SchedulerStore.Mutable schedulerStore,
@@ -78,7 +79,7 @@ class DbStorage extends AbstractIdleService implements Storage {
this.sessionFactory = requireNonNull(sessionFactory);
this.enumValueMapper = requireNonNull(enumValueMapper);
- this.postTransactionWork = requireNonNull(postTransactionWork);
+ this.gatedWorkQueue = requireNonNull(gatedWorkQueue);
requireNonNull(cronJobStore);
requireNonNull(taskStore);
requireNonNull(schedulerStore);
@@ -140,13 +141,6 @@ class DbStorage extends AbstractIdleService implements Storage {
}
}
- private final ThreadLocal<Boolean> inTransaction = new ThreadLocal<Boolean>() {
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
-
@Transactional
<T, E extends Exception> T transactionedWrite(MutateWork<T, E> work) throws E {
return work.apply(storeProvider);
@@ -155,28 +149,22 @@ class DbStorage extends AbstractIdleService implements Storage {
@Timed("db_storage_write_operation")
@Override
public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
- // Only flush for the top-level write() call when calls are reentrant.
- boolean shouldFlush = !inTransaction.get();
- if (shouldFlush) {
- inTransaction.set(true);
- }
-
- try {
- return transactionedWrite(work);
- } catch (PersistenceException e) {
- throw new StorageException(e.getMessage(), e);
- } finally {
- // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
- // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
- // due to failure of an unrelated transaction. This matches behavior prior to the
- // introduction of DbStorage, but should be revisited.
- // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
- // successful.
- if (shouldFlush) {
- postTransactionWork.flush();
- inTransaction.set(false);
+ // NOTE: Async work is intentionally executed regardless of whether the transaction succeeded.
+ // Doing otherwise runs the risk of cross-talk between transactions and losing async tasks
+ // due to failure of an unrelated transaction. This matches behavior prior to the
+ // introduction of DbStorage, but should be revisited.
+ // TODO(wfarner): Consider revisiting to execute async work only when the transaction is
+ // successful.
+ return gatedWorkQueue.closeDuring(new GatedOperation<T, E>() {
+ @Override
+ public T doWithGateClosed() throws E {
+ try {
+ return transactionedWrite(work);
+ } catch (PersistenceException e) {
+ throw new StorageException(e.getMessage(), e);
+ }
}
- }
+ });
}
@VisibleForTesting
@@ -191,20 +179,24 @@ class DbStorage extends AbstractIdleService implements Storage {
public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
throws StorageException, E {
- // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
- // insert.
- try (SqlSession session = sessionFactory.openSession(false)) {
- try {
- session.update(DISABLE_UNDO_LOG);
- work.apply(storeProvider);
- } catch (PersistenceException e) {
- throw new StorageException(e.getMessage(), e);
- } finally {
- session.update(ENABLE_UNDO_LOG);
+ gatedWorkQueue.closeDuring(new GatedOperation<Void, E>() {
+ @Override
+ public Void doWithGateClosed() throws E {
+ // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
+ // insert.
+ try (SqlSession session = sessionFactory.openSession(false)) {
+ try {
+ session.update(DISABLE_UNDO_LOG);
+ work.apply(storeProvider);
+ } catch (PersistenceException e) {
+ throw new StorageException(e.getMessage(), e);
+ } finally {
+ session.update(ENABLE_UNDO_LOG);
+ }
+ }
+ return null;
}
- } finally {
- postTransactionWork.flush();
- }
+ });
}
@Override
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index 4941128..a44b9da 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -72,8 +72,6 @@ import org.apache.aurora.gen.storage.Transaction;
import org.apache.aurora.gen.storage.storageConstants;
import org.apache.aurora.scheduler.AppStartup;
import org.apache.aurora.scheduler.ResourceSlot;
-import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.log.Log;
import org.apache.aurora.scheduler.log.Log.Entry;
import org.apache.aurora.scheduler.log.Log.Position;
@@ -378,8 +376,6 @@ public class SchedulerIT extends BaseZooKeeperTest {
scheduler.getValue().registered(driver,
FrameworkID.newBuilder().setValue(FRAMEWORK_ID).build(),
MasterInfo.getDefaultInstance());
- // Registration is published on the event bus, which will be gated until a flush.
- injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class)).flush();
awaitSchedulerReady();
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
deleted file mode 100644
index 2867633..0000000
--- a/src/test/java/org/apache/aurora/scheduler/async/GatedDelayExecutorTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.async;
-
-import java.util.concurrent.ScheduledExecutorService;
-
-import org.apache.aurora.common.quantity.Amount;
-import org.apache.aurora.common.quantity.Time;
-import org.apache.aurora.common.testing.easymock.EasyMockTest;
-
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.easymock.IExpectationSetters;
-import org.junit.Before;
-import org.junit.Test;
-
-import static org.easymock.EasyMock.eq;
-import static org.easymock.EasyMock.expectLastCall;
-
-public class GatedDelayExecutorTest extends EasyMockTest {
-
- private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS);
-
- private ScheduledExecutorService mockExecutor;
- private Runnable runnable;
- private GatedDelayExecutor gatedExecutor;
-
- @Before
- public void setUp() {
- mockExecutor = createMock(ScheduledExecutorService.class);
- runnable = createMock(Runnable.class);
- gatedExecutor = new GatedDelayExecutor(mockExecutor);
- }
-
- @Test
- public void testNoFlush() {
- control.replay();
-
- gatedExecutor.execute(runnable);
- // flush() is not called, so no work is performed.
- }
-
- private IExpectationSetters<?> invokeWorkWhenSubmitted() {
- return expectLastCall().andAnswer(new IAnswer<Object>() {
- @Override
- public Object answer() {
- ((Runnable) EasyMock.getCurrentArguments()[0]).run();
- return null;
- }
- });
- }
-
- @Test
- public void testExecute() {
- mockExecutor.execute(EasyMock.<Runnable>anyObject());
- invokeWorkWhenSubmitted();
- runnable.run();
- expectLastCall();
-
- control.replay();
-
- gatedExecutor.execute(runnable);
- gatedExecutor.flush();
- }
-
- @Test
- public void testExecuteAfterDelay() {
- mockExecutor.schedule(
- EasyMock.<Runnable>anyObject(),
- eq(ONE_SECOND.getValue().longValue()),
- eq(ONE_SECOND.getUnit().getTimeUnit()));
- invokeWorkWhenSubmitted();
- runnable.run();
-
- control.replay();
-
- gatedExecutor.execute(runnable, ONE_SECOND);
- gatedExecutor.flush();
- }
-}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
new file mode 100644
index 0000000..c62a1d5
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/async/GatingDelayExecutorTest.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.async;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
+
+import com.google.common.base.Throwables;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import org.apache.aurora.common.quantity.Amount;
+import org.apache.aurora.common.quantity.Time;
+import org.apache.aurora.common.testing.easymock.EasyMockTest;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expectLastCall;
+import static org.junit.Assert.assertEquals;
+
+public class GatingDelayExecutorTest extends EasyMockTest {
+
+ private static final Amount<Long, Time> ONE_SECOND = Amount.of(1L, Time.SECONDS);
+
+ private ScheduledExecutorService gatedExecutor;
+ private Runnable runnable;
+ private GatingDelayExecutor gatingExecutor;
+
+ @Before
+ public void setUp() {
+ gatedExecutor = createMock(ScheduledExecutorService.class);
+ runnable = createMock(Runnable.class);
+ gatingExecutor = new GatingDelayExecutor(gatedExecutor);
+ }
+
+ @Test
+ public void testGateOpen() {
+ gatedExecutor.execute(runnable);
+
+ control.replay();
+
+ // The gate was not closed, so the work is executed immediately.
+ gatingExecutor.execute(runnable);
+ }
+
+ private IExpectationSetters<?> invokeWorkWhenSubmitted() {
+ return expectLastCall().andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() {
+ ((Runnable) EasyMock.getCurrentArguments()[0]).run();
+ return null;
+ }
+ });
+ }
+
+ @Test
+ public void testGateIsThreadSpecific() throws InterruptedException {
+ gatedExecutor.execute(runnable);
+
+ control.replay();
+
+ CountDownLatch gateClosed = new CountDownLatch(1);
+ CountDownLatch unblock = new CountDownLatch(1);
+ Runnable closer = new Runnable() {
+ @Override
+ public void run() {
+ gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+ @Override
+ public String doWithGateClosed() {
+ gateClosed.countDown();
+ try {
+ unblock.await();
+ } catch (InterruptedException e) {
+ throw Throwables.propagate(e);
+ }
+ return "hi";
+ }
+ });
+ }
+ };
+ new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("GateTest")
+ .build()
+ .newThread(closer)
+ .start();
+
+ gateClosed.await();
+ gatingExecutor.execute(runnable);
+ assertQueueSize(0);
+ unblock.countDown();
+ }
+
+ private void assertQueueSize(int size) {
+ assertEquals(size, gatingExecutor.getQueueSize());
+ }
+
+ @Test
+ public void testReentrantClose() {
+ gatedExecutor.execute(runnable);
+ expectLastCall().times(3);
+
+ control.replay();
+
+ gatingExecutor.execute(runnable);
+ assertQueueSize(0);
+
+ String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+ @Override
+ public String doWithGateClosed() {
+ gatingExecutor.execute(runnable);
+ assertQueueSize(1);
+
+ String result = gatingExecutor.closeDuring(new GatedOperation<String, RuntimeException>() {
+ @Override
+ public String doWithGateClosed() {
+ gatingExecutor.execute(runnable);
+ assertQueueSize(2);
+ return "hello";
+ }
+ });
+ assertEquals("hello", result);
+
+ return "hi";
+ }
+ });
+ assertEquals("hi", result);
+ assertQueueSize(0);
+ }
+
+ @Test
+ public void testExecute() {
+ gatedExecutor.execute(runnable);
+ invokeWorkWhenSubmitted();
+ runnable.run();
+ expectLastCall();
+
+ control.replay();
+
+ gatingExecutor.execute(runnable);
+ }
+
+ @Test
+ public void testExecuteAfterDelay() {
+ gatedExecutor.schedule(
+ runnable,
+ ONE_SECOND.getValue().longValue(),
+ ONE_SECOND.getUnit().getTimeUnit());
+ invokeWorkWhenSubmitted();
+ runnable.run();
+
+ control.replay();
+
+ gatingExecutor.execute(runnable, ONE_SECOND);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
index 0c7da07..acd2cd1 100644
--- a/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/pruning/TaskHistoryPrunerTest.java
@@ -13,14 +13,19 @@
*/
package org.apache.aurora.scheduler.pruning;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
+import com.google.common.io.Closer;
+import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
@@ -44,7 +49,6 @@ import org.apache.aurora.gen.TaskEvent;
import org.apache.aurora.scheduler.async.AsyncModule;
import org.apache.aurora.scheduler.async.AsyncModule.AsyncExecutor;
import org.apache.aurora.scheduler.async.DelayExecutor;
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
import org.apache.aurora.scheduler.base.Tasks;
import org.apache.aurora.scheduler.events.PubsubEvent.TaskStateChange;
import org.apache.aurora.scheduler.pruning.TaskHistoryPruner.HistoryPrunnerSettings;
@@ -56,6 +60,7 @@ import org.apache.aurora.scheduler.testing.FakeStatsProvider;
import org.easymock.Capture;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -84,6 +89,7 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
private StateManager stateManager;
private StorageTestUtil storageUtil;
private TaskHistoryPruner pruner;
+ private Closer closer;
@Before
public void setUp() {
@@ -98,6 +104,12 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
clock,
new HistoryPrunnerSettings(ONE_DAY, ONE_MINUTE, PER_JOB_HISTORY),
storageUtil.storage);
+ closer = Closer.create();
+ }
+
+ @After
+ public void tearDownCloser() throws Exception {
+ closer.close();
}
@Test
@@ -242,6 +254,13 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
.setDaemon(true)
.setNameFormat("testThreadSafeEvents-executor")
.build());
+ closer.register(new Closeable() {
+ @Override
+ public void close() throws IOException {
+ MoreExecutors.shutdownAndAwaitTermination(realExecutor, 1L, TimeUnit.SECONDS);
+ }
+ });
+
Injector injector = Guice.createInjector(
new AsyncModule(realExecutor),
new AbstractModule() {
@@ -251,24 +270,16 @@ public class TaskHistoryPrunerTest extends EasyMockTest {
}
});
executor = injector.getInstance(Key.get(DelayExecutor.class, AsyncExecutor.class));
- FlushableWorkQueue flusher =
- injector.getInstance(Key.get(FlushableWorkQueue.class, AsyncExecutor.class));
pruner = buildPruner(executor);
- Command onDeleted = new Command() {
- @Override
- public void execute() {
- // The goal is to verify that the call does not deadlock. We do not care about the outcome.
- changeState(makeTask("b", ASSIGNED), STARTING);
- }
- };
+ // The goal is to verify that the call does not deadlock. We do not care about the outcome.
+ Command onDeleted = () -> changeState(makeTask("b", ASSIGNED), STARTING);
CountDownLatch taskDeleted = expectTaskDeleted(onDeleted, TASK_ID);
control.replay();
// Change the task to a terminal state and wait for it to be pruned.
changeState(makeTask(TASK_ID, RUNNING), KILLED);
- flusher.flush();
taskDeleted.await();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/e658a8a0/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 6dd5026..a0bd34b 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -13,11 +13,9 @@
*/
package org.apache.aurora.scheduler.storage.db;
-import java.util.concurrent.atomic.AtomicBoolean;
-
import org.apache.aurora.common.testing.easymock.EasyMockTest;
-
-import org.apache.aurora.scheduler.async.FlushableWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue;
+import org.apache.aurora.scheduler.async.GatedWorkQueue.GatedOperation;
import org.apache.aurora.scheduler.storage.AttributeStore;
import org.apache.aurora.scheduler.storage.CronJobStore;
import org.apache.aurora.scheduler.storage.JobUpdateStore;
@@ -34,20 +32,19 @@ import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.easymock.EasyMock;
import org.easymock.IAnswer;
+import org.easymock.IExpectationSetters;
import org.junit.Before;
import org.junit.Test;
import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
public class DbStorageTest extends EasyMockTest {
private SqlSessionFactory sessionFactory;
private SqlSession session;
private EnumValueMapper enumMapper;
- private FlushableWorkQueue flusher;
+ private GatedWorkQueue gatedWorkQueue;
private Work.Quiet<String> readWork;
private MutateWork.NoResult.Quiet writeWork;
@@ -58,14 +55,14 @@ public class DbStorageTest extends EasyMockTest {
sessionFactory = createMock(SqlSessionFactory.class);
session = createMock(SqlSession.class);
enumMapper = createMock(EnumValueMapper.class);
- flusher = createMock(FlushableWorkQueue.class);
+ gatedWorkQueue = createMock(GatedWorkQueue.class);
readWork = createMock(new Clazz<Work.Quiet<String>>() { });
writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
storage = new DbStorage(
sessionFactory,
enumMapper,
- flusher,
+ gatedWorkQueue,
createMock(CronJobStore.Mutable.class),
createMock(TaskStore.Mutable.class),
createMock(SchedulerStore.Mutable.class),
@@ -94,12 +91,23 @@ public class DbStorageTest extends EasyMockTest {
assertEquals("hi", storage.read(readWork));
}
+ private IExpectationSetters<?> expectGateClosed() throws Exception {
+ return expect(gatedWorkQueue.closeDuring(EasyMock.anyObject()))
+ .andAnswer(new IAnswer<Object>() {
+ @Override
+ public Object answer() throws Throwable {
+ GatedOperation<?, ?> op = (GatedOperation<?, ?>) EasyMock.getCurrentArguments()[0];
+ return op.doWithGateClosed();
+ }
+ });
+ }
+
@Test(expected = StorageException.class)
- public void testBulkLoadFails() {
+ public void testBulkLoadFails() throws Exception {
expect(sessionFactory.openSession(false)).andReturn(session);
expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
- flusher.flush();
+ expectGateClosed();
control.replay();
@@ -107,13 +115,13 @@ public class DbStorageTest extends EasyMockTest {
}
@Test
- public void testBulkLoad() {
+ public void testBulkLoad() throws Exception {
expect(sessionFactory.openSession(false)).andReturn(session);
expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0);
expect(writeWork.apply(EasyMock.anyObject())).andReturn(null);
session.close();
expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
- flusher.flush();
+ expectGateClosed();
control.replay();
@@ -121,16 +129,8 @@ public class DbStorageTest extends EasyMockTest {
}
@Test
- public void testFlushWithReentrantWrites() {
- final AtomicBoolean flushed = new AtomicBoolean(false);
- flusher.flush();
- expectLastCall().andAnswer(new IAnswer<Void>() {
- @Override
- public Void answer() {
- flushed.set(true);
- return null;
- }
- });
+ public void testGateWithReentrantWrites() throws Exception {
+ expectGateClosed().times(2);
control.replay();
@@ -138,9 +138,6 @@ public class DbStorageTest extends EasyMockTest {
@Override
public void execute(MutableStoreProvider storeProvider) {
noopWrite();
-
- // Should not have flushed yet.
- assertFalse("flush() should not be called until outer write() completes.", flushed.get());
}
});
}
@@ -155,9 +152,8 @@ public class DbStorageTest extends EasyMockTest {
}
@Test
- public void testFlushWithSeqentialWrites() {
- flusher.flush();
- expectLastCall().times(2);
+ public void testFlushWithSeqentialWrites() throws Exception {
+ expectGateClosed().times(2);
control.replay();