You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ma...@apache.org on 2016/03/04 03:36:38 UTC
aurora git commit: Dropping bulkLoad() from Storage
Repository: aurora
Updated Branches:
refs/heads/master 98a2bc194 -> 3a3415e1c
Dropping bulkLoad() from Storage
Bugs closed: AURORA-1324
Reviewed at https://reviews.apache.org/r/44368/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/3a3415e1
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/3a3415e1
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/3a3415e1
Branch: refs/heads/master
Commit: 3a3415e1c8b4b5442480454ed664ea44c62e862d
Parents: 98a2bc1
Author: Maxim Khutornenko <ma...@apache.org>
Authored: Thu Mar 3 18:36:18 2016 -0800
Committer: Maxim Khutornenko <ma...@apache.org>
Committed: Thu Mar 3 18:36:18 2016 -0800
----------------------------------------------------------------------
.../aurora/benchmark/StatusUpdateBenchmark.java | 8 -----
.../aurora/benchmark/ThriftApiBenchmarks.java | 2 +-
.../storage/CallOrderEnforcingStorage.java | 8 -----
.../aurora/scheduler/storage/Storage.java | 13 +------
.../aurora/scheduler/storage/db/DbStorage.java | 32 -----------------
.../scheduler/storage/log/LogStorage.java | 19 +++-------
.../app/local/FakeNonVolatileStorage.java | 7 ----
.../scheduler/storage/db/DbStorageTest.java | 37 +-------------------
.../scheduler/storage/log/LogStorageTest.java | 24 -------------
9 files changed, 8 insertions(+), 142 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
index dc1ef82..f408453 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/StatusUpdateBenchmark.java
@@ -142,14 +142,6 @@ public class StatusUpdateBenchmark {
}
@Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- maybeSleep();
- underlyingStorage.bulkLoad(work);
- }
-
- @Override
public void prepare() throws StorageException {
underlyingStorage.prepare();
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
----------------------------------------------------------------------
diff --git a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
index 293b88f..6074638 100644
--- a/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
+++ b/src/jmh/java/org/apache/aurora/benchmark/ThriftApiBenchmarks.java
@@ -167,7 +167,7 @@ public class ThriftApiBenchmarks {
private static void bulkLoadTasks(Storage storage, final TestConfiguration config) {
// Ideally we would use the API to populate the storage, but wiring in the writable thrift
// interface requires considerably more binding setup.
- storage.bulkLoad(storeProvider -> {
+ storage.write((Storage.MutateWork.NoResult.Quiet) storeProvider -> {
for (int roleId = 0; roleId < config.roles; roleId++) {
String role = "role" + roleId;
for (int envId = 0; envId < config.envs; envId++) {
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index de4ada4..2a5ec9c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -114,14 +114,6 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
}
@Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- checkInState(State.PREPARED);
- wrapped.bulkLoad(work);
- }
-
- @Override
public <T, E extends Exception> T write(MutateWork<T, E> work)
throws StorageException, E {
checkInState(State.READY);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 578bb37..5124d17 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -221,17 +221,6 @@ public interface Storage {
<T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
/**
- * Recovers the contents of the storage, using the provided operation. This may be done with
- * relaxed transactional guarantees and/or rollback support.
- *
- * @param work Bulk load operation.
- * @param <E> The type of exception this unit of work can throw.
- * @throws StorageException if there was a problem reading from or writing to stable storage.
- * @throws E bubbled transparently when the unit of work throws
- */
- <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E;
-
- /**
* Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
* out of date data, etc. This method should not block.
*
@@ -273,7 +262,7 @@ public interface Storage {
@Retention(RetentionPolicy.RUNTIME)
@Target({ ElementType.PARAMETER, ElementType.METHOD })
@Qualifier
- public @interface Volatile { }
+ @interface Volatile { }
/**
* Utility functions for interacting with a Storage instance.
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index c0f8d35..cca92dd 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -19,7 +19,6 @@ import java.nio.charset.StandardCharsets;
import javax.sql.DataSource;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Supplier;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.AbstractIdleService;
@@ -173,35 +172,6 @@ class DbStorage extends AbstractIdleService implements Storage {
});
}
- @VisibleForTesting
- static final String DISABLE_UNDO_LOG = "DISABLE_UNDO_LOG";
- @VisibleForTesting
- static final String ENABLE_UNDO_LOG = "ENABLE_UNDO_LOG";
-
- // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless,
- // resulting in no performance gain. Figure out why.
- @Timed("db_storage_bulk_load_operation")
- @Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- gatedWorkQueue.closeDuring(() -> {
- // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
- // insert.
- try (SqlSession session = sessionFactory.openSession(false)) {
- try {
- session.update(DISABLE_UNDO_LOG);
- work.apply(storeProvider);
- } catch (PersistenceException e) {
- throw new StorageException(e.getMessage(), e);
- } finally {
- session.update(ENABLE_UNDO_LOG);
- }
- }
- return null;
- });
- }
-
@Override
public void prepare() {
startAsync().awaitRunning();
@@ -233,8 +203,6 @@ class DbStorage extends AbstractIdleService implements Storage {
CharStreams.toString(new InputStreamReader(
DbStorage.class.getResourceAsStream("schema.sql"),
StandardCharsets.UTF_8)));
- addMappedStatement(configuration, DISABLE_UNDO_LOG, "SET UNDO_LOG 0;");
- addMappedStatement(configuration, ENABLE_UNDO_LOG, "SET UNDO_LOG 1;");
try (SqlSession session = sessionFactory.openSession()) {
session.update(createStatementName);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 243c8a0..5143668 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -432,13 +432,11 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@Timed("scheduler_log_recover")
void recover() throws RecoveryFailedException {
- writeBehindStorage.bulkLoad(storeProvider -> {
- try {
- streamManager.readFromBeginning(LogStorage.this::replay);
- } catch (CodingException | InvalidPositionException | StreamAccessException e) {
- throw new RecoveryFailedException(e);
- }
- });
+ try {
+ streamManager.readFromBeginning(LogStorage.this::replay);
+ } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+ throw new RecoveryFailedException(e);
+ }
}
private static final class RecoveryFailedException extends SchedulerException {
@@ -559,13 +557,6 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
}
@Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- throw new UnsupportedOperationException("Log storage may not be populated in bulk.");
- }
-
- @Override
public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
return writeBehindStorage.read(work);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 0768ec3..3336f8c 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -51,13 +51,6 @@ class FakeNonVolatileStorage implements NonVolatileStorage {
}
@Override
- public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
- throws StorageException, E {
-
- delegate.bulkLoad(work);
- }
-
- @Override
public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
return delegate.write(work);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
index 420d444..f26529c 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -23,13 +23,11 @@ import org.apache.aurora.scheduler.storage.JobUpdateStore;
import org.apache.aurora.scheduler.storage.LockStore;
import org.apache.aurora.scheduler.storage.QuotaStore;
import org.apache.aurora.scheduler.storage.SchedulerStore;
-import org.apache.aurora.scheduler.storage.Storage.MutateWork;
import org.apache.aurora.scheduler.storage.Storage.MutateWork.NoResult;
import org.apache.aurora.scheduler.storage.Storage.StorageException;
import org.apache.aurora.scheduler.storage.Storage.Work;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.ibatis.exceptions.PersistenceException;
-import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.easymock.EasyMock;
import org.easymock.IExpectationSetters;
@@ -40,26 +38,19 @@ import static org.easymock.EasyMock.expect;
import static org.junit.Assert.assertEquals;
public class DbStorageTest extends EasyMockTest {
-
- private SqlSessionFactory sessionFactory;
- private SqlSession session;
private GatedWorkQueue gatedWorkQueue;
private Work.Quiet<String> readWork;
- private MutateWork.NoResult.Quiet writeWork;
private DbStorage storage;
@Before
public void setUp() {
- sessionFactory = createMock(SqlSessionFactory.class);
- session = createMock(SqlSession.class);
gatedWorkQueue = createMock(GatedWorkQueue.class);
readWork = createMock(new Clazz<Work.Quiet<String>>() { });
- writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
StatsProvider statsProvider = createMock(StatsProvider.class);
storage = new DbStorage(
- sessionFactory,
+ createMock(SqlSessionFactory.class),
createMock(EnumValueMapper.class),
gatedWorkQueue,
createMock(CronJobStore.Mutable.class),
@@ -99,32 +90,6 @@ public class DbStorageTest extends EasyMockTest {
});
}
- @Test(expected = StorageException.class)
- public void testBulkLoadFails() throws Exception {
- expect(sessionFactory.openSession(false)).andReturn(session);
- expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
- expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
- expectGateClosed();
-
- control.replay();
-
- storage.bulkLoad(writeWork);
- }
-
- @Test
- public void testBulkLoad() throws Exception {
- expect(sessionFactory.openSession(false)).andReturn(session);
- expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0);
- expect(writeWork.apply(EasyMock.anyObject())).andReturn(null);
- session.close();
- expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
- expectGateClosed();
-
- control.replay();
-
- storage.bulkLoad(writeWork);
- }
-
@Test
public void testGateWithReentrantWrites() throws Exception {
expectGateClosed().times(2);
http://git-wip-us.apache.org/repos/asf/aurora/blob/3a3415e1/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index 7382eca..bf9479d 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -110,7 +110,6 @@ import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
-import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
@@ -369,12 +368,6 @@ public class LogStorageTest extends EasyMockTest {
expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
}
- storageUtil.storage.bulkLoad(EasyMock.anyObject());
- expectLastCall().andAnswer(() -> {
- NoResult work = (NoResult<?>) EasyMock.getCurrentArguments()[0];
- work.apply(storageUtil.mutableStoreProvider);
- return null;
- });
expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
}
@@ -414,12 +407,6 @@ public class LogStorageTest extends EasyMockTest {
return null;
});
- storageUtil.storage.bulkLoad(EasyMock.anyObject());
- expectLastCall().andAnswer(() -> {
- NoResult work = (NoResult<?>) EasyMock.getCurrentArguments()[0];
- work.apply(storageUtil.mutableStoreProvider);
- return null;
- });
expect(stream.readAll()).andReturn(Collections.emptyIterator());
Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
@@ -964,17 +951,6 @@ public class LogStorageTest extends EasyMockTest {
}.run();
}
- @Test(expected = UnsupportedOperationException.class)
- public void testBulkLoad() throws Exception {
- expect(log.open()).andReturn(stream);
- MutateWork.NoResult.Quiet load = createMock(new Clazz<NoResult.Quiet>() { });
-
- control.replay();
-
- logStorage.prepare();
- logStorage.bulkLoad(load);
- }
-
private LogEntry createTransaction(Op... ops) {
return LogEntry.transaction(
new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));