You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/22 00:49:23 UTC
aurora git commit: Add a specific storage routine for bulk loading
data.
Repository: aurora
Updated Branches:
refs/heads/master d10d2d171 -> ac8562489
Add a specific storage routine for bulk loading data.
Reviewed at https://reviews.apache.org/r/33273/
Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ac856248
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ac856248
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ac856248
Branch: refs/heads/master
Commit: ac8562489633c0dc0cd510757275a74e07559c58
Parents: d10d2d1
Author: Bill Farner <wf...@apache.org>
Authored: Tue Apr 21 15:45:48 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Apr 21 15:45:48 2015 -0700
----------------------------------------------------------------------
.../storage/CallOrderEnforcingStorage.java | 8 ++
.../aurora/scheduler/storage/Storage.java | 11 ++
.../aurora/scheduler/storage/db/DbStorage.java | 53 ++++++---
.../scheduler/storage/log/LogStorage.java | 30 ++++--
.../scheduler/storage/mem/MemStorage.java | 11 +-
.../app/local/FakeNonVolatileStorage.java | 7 ++
.../scheduler/storage/db/DbStorageTest.java | 108 +++++++++++++++++++
.../scheduler/storage/log/LogStorageTest.java | 30 ++++++
8 files changed, 234 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 07d81e4..64aa10d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -116,6 +116,14 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
}
@Override
+ public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+ throws StorageException, E {
+
+ checkInState(State.PREPARED);
+ wrapped.bulkLoad(work);
+ }
+
+ @Override
public <T, E extends Exception> T write(MutateWork<T, E> work)
throws StorageException, E {
checkInState(State.READY);
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 972a3c1..21f6a64 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -212,6 +212,17 @@ public interface Storage {
<T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
/**
+ * Recovers the contents of the storage, using the provided operation. This may be done with
+ * relaxed transactional guarantees and/or rollback support.
+ *
+ * @param work Bulk load operation.
+ * @param <E> The type of exception this unit of work can throw.
+ * @throws StorageException if there was a problem reading from or writing to stable storage.
+ * @throws E bubbled transparently when the unit of work throws
+ */
+ <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E;
+
+ /**
* Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
* out of date data, etc. This method should not block.
*
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 526df10..49db52d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -17,6 +17,7 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.io.CharStreams;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.inject.Inject;
@@ -34,9 +35,7 @@ import org.apache.aurora.scheduler.storage.Storage;
import org.apache.aurora.scheduler.storage.TaskStore;
import org.apache.ibatis.builder.StaticSqlSource;
import org.apache.ibatis.exceptions.PersistenceException;
-import org.apache.ibatis.logging.LogFactory;
import org.apache.ibatis.mapping.MappedStatement.Builder;
-import org.apache.ibatis.mapping.SqlCommandType;
import org.apache.ibatis.session.Configuration;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
@@ -44,6 +43,8 @@ import org.mybatis.guice.transactional.Transactional;
import static java.util.Objects.requireNonNull;
+import static org.apache.ibatis.mapping.SqlCommandType.UPDATE;
+
/**
* A storage implementation backed by a relational database.
* <p>
@@ -140,11 +141,41 @@ class DbStorage extends AbstractIdleService implements Storage {
}
}
+ @VisibleForTesting
+ static final String DISABLE_UNDO_LOG = "DISABLE_UNDO_LOG";
+ @VisibleForTesting
+ static final String ENABLE_UNDO_LOG = "ENABLE_UNDO_LOG";
+
+ // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless,
+ // resulting in no performance gain. Figure out why.
+ @Override
+ public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+ throws StorageException, E {
+
+ // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
+ // insert.
+ try (SqlSession session = sessionFactory.openSession(false)) {
+ try {
+ session.update(DISABLE_UNDO_LOG);
+ work.apply(storeProvider);
+ } catch (PersistenceException e) {
+ throw new StorageException(e.getMessage(), e);
+ } finally {
+ session.update(ENABLE_UNDO_LOG);
+ }
+ }
+ }
+
@Override
public void prepare() {
startAsync().awaitRunning();
}
+ private static void addMappedStatement(Configuration configuration, String name, String sql) {
+ configuration.addMappedStatement(
+ new Builder(configuration, name, new StaticSqlSource(configuration, sql), UPDATE).build());
+ }
+
/**
* Creates the SQL schema during service start-up.
* Note: This design assumes a volatile database engine.
@@ -152,22 +183,18 @@ class DbStorage extends AbstractIdleService implements Storage {
@Override
@Transactional
protected void startUp() throws IOException {
- LogFactory.useJdkLogging();
-
Configuration configuration = sessionFactory.getConfiguration();
String createStatementName = "create_tables";
configuration.setMapUnderscoreToCamelCase(true);
- configuration.addMappedStatement(new Builder(
+
+ addMappedStatement(
configuration,
createStatementName,
- new StaticSqlSource(
- configuration,
- CharStreams.toString(
- new InputStreamReader(
- DbStorage.class.getResourceAsStream("schema.sql"),
- StandardCharsets.UTF_8))),
- SqlCommandType.UPDATE)
- .build());
+ CharStreams.toString(new InputStreamReader(
+ DbStorage.class.getResourceAsStream("schema.sql"),
+ StandardCharsets.UTF_8)));
+ addMappedStatement(configuration, DISABLE_UNDO_LOG, "SET UNDO_LOG 0;");
+ addMappedStatement(configuration, ENABLE_UNDO_LOG, "SET UNDO_LOG 1;");
try (SqlSession session = sessionFactory.openSession()) {
session.update(createStatementName);
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 63b5b1f..bb59cdf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -538,16 +538,21 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
@Timed("scheduler_log_recover")
void recover() throws RecoveryFailedException {
- try {
- streamManager.readFromBeginning(new Closure<LogEntry>() {
- @Override
- public void execute(LogEntry logEntry) {
- replay(logEntry);
+ writeBehindStorage.bulkLoad(new MutateWork.NoResult.Quiet() {
+ @Override
+ protected void execute(MutableStoreProvider storeProvider) {
+ try {
+ streamManager.readFromBeginning(new Closure<LogEntry>() {
+ @Override
+ public void execute(LogEntry logEntry) {
+ replay(logEntry);
+ }
+ });
+ } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+ throw new RecoveryFailedException(e);
}
- });
- } catch (CodingException | InvalidPositionException | StreamAccessException e) {
- throw new RecoveryFailedException(e);
- }
+ }
+ });
}
private static final class RecoveryFailedException extends SchedulerException {
@@ -679,6 +684,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
}
@Override
+ public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+ throws StorageException, E {
+
+ throw new UnsupportedOperationException("Log storage may not be populated in bulk.");
+ }
+
+ @Override
public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
return writeBehindStorage.read(work);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
index dafe1c4..c5ccccd 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -128,8 +128,7 @@ public class MemStorage implements Storage {
@Timed("mem_storage_read_operation")
@Override
- public <T, E extends Exception> T read(final Work<T, E> work)
- throws StorageException, E {
+ public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
return delegatedStore.read(new Work<T, E>() {
@Override
public T apply(StoreProvider provider) throws E {
@@ -149,6 +148,14 @@ public class MemStorage implements Storage {
});
}
+ @Timed("mem_storage_bulk_load_operation")
+ @Override
+ public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+ throws StorageException, E {
+
+ delegatedStore.bulkLoad(work);
+ }
+
@Override
public void prepare() throws StorageException {
delegatedStore.prepare();
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 3336f8c..0768ec3 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -51,6 +51,13 @@ class FakeNonVolatileStorage implements NonVolatileStorage {
}
@Override
+ public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+ throws StorageException, E {
+
+ delegate.bulkLoad(work);
+ }
+
+ @Override
public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
return delegate.write(work);
}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
new file mode 100644
index 0000000..743f5ba
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.ibatis.exceptions.PersistenceException;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class DbStorageTest extends EasyMockTest {
+
+ private SqlSessionFactory sessionFactory;
+ private SqlSession session;
+ private EnumValueMapper enumMapper;
+ private Work.Quiet<String> readWork;
+ private MutateWork.NoResult.Quiet writeWork;
+
+ private DbStorage storage;
+
+ @Before
+ public void setUp() {
+ sessionFactory = createMock(SqlSessionFactory.class);
+ session = createMock(SqlSession.class);
+ enumMapper = createMock(EnumValueMapper.class);
+ readWork = createMock(new Clazz<Work.Quiet<String>>() { });
+ writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
+
+ storage = new DbStorage(
+ sessionFactory,
+ enumMapper,
+ createMock(SchedulerStore.Mutable.class),
+ createMock(AttributeStore.Mutable.class),
+ createMock(LockStore.Mutable.class),
+ createMock(QuotaStore.Mutable.class),
+ createMock(JobUpdateStore.Mutable.class));
+ }
+
+ @Test(expected = StorageException.class)
+ public void testReadFails() {
+ expect(readWork.apply(EasyMock.<StoreProvider>anyObject()))
+ .andThrow(new PersistenceException());
+
+ control.replay();
+
+ storage.read(readWork);
+ }
+
+ @Test
+ public void testRead() {
+ expect(readWork.apply(EasyMock.<StoreProvider>anyObject())).andReturn("hi");
+
+ control.replay();
+
+ assertEquals("hi", storage.read(readWork));
+ }
+
+ @Test(expected = StorageException.class)
+ public void testBulkLoadFails() {
+ expect(sessionFactory.openSession(false)).andReturn(session);
+ expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
+ expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+
+ control.replay();
+
+ storage.bulkLoad(writeWork);
+ }
+
+ @Test
+ public void testBulkLoad() {
+ expect(sessionFactory.openSession(false)).andReturn(session);
+ expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0);
+ writeWork.apply(EasyMock.<MutableStoreProvider>anyObject());
+ session.close();
+ expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+
+ control.replay();
+
+ storage.bulkLoad(writeWork);
+ }
+}
http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index cb6ba25..cbc2d38 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -109,6 +109,7 @@ import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
import org.easymock.Capture;
+import org.easymock.EasyMock;
import org.easymock.IAnswer;
import org.junit.Before;
import org.junit.Test;
@@ -422,6 +423,15 @@ public class LogStorageTest extends EasyMockTest {
expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
}
+ storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject());
+ expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() {
+ @Override
+ public NoResult<?> answer() throws Throwable {
+ MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0];
+ work.apply(storageUtil.mutableStoreProvider);
+ return null;
+ }
+ });
expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
}
@@ -467,6 +477,15 @@ public class LogStorageTest extends EasyMockTest {
}
});
+ storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject());
+ expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() {
+ @Override
+ public NoResult<?> answer() throws Throwable {
+ MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0];
+ work.apply(storageUtil.mutableStoreProvider);
+ return null;
+ }
+ });
expect(stream.readAll()).andReturn(Iterators.<Entry>emptyIterator());
final Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
@@ -1033,6 +1052,17 @@ public class LogStorageTest extends EasyMockTest {
}.run();
}
+ @Test(expected = UnsupportedOperationException.class)
+ public void testBulkLoad() throws Exception {
+ expect(log.open()).andReturn(stream);
+ MutateWork.NoResult.Quiet load = createMock(new Clazz<NoResult.Quiet>() { });
+
+ control.replay();
+
+ logStorage.prepare();
+ logStorage.bulkLoad(load);
+ }
+
private LogEntry createTransaction(Op... ops) {
return LogEntry.transaction(
new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));