You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2015/04/22 00:49:23 UTC

aurora git commit: Add a specific storage routine for bulk loading data.

Repository: aurora
Updated Branches:
  refs/heads/master d10d2d171 -> ac8562489


Add a specific storage routine for bulk loading data.

Reviewed at https://reviews.apache.org/r/33273/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/ac856248
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/ac856248
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/ac856248

Branch: refs/heads/master
Commit: ac8562489633c0dc0cd510757275a74e07559c58
Parents: d10d2d1
Author: Bill Farner <wf...@apache.org>
Authored: Tue Apr 21 15:45:48 2015 -0700
Committer: Bill Farner <wf...@apache.org>
Committed: Tue Apr 21 15:45:48 2015 -0700

----------------------------------------------------------------------
 .../storage/CallOrderEnforcingStorage.java      |   8 ++
 .../aurora/scheduler/storage/Storage.java       |  11 ++
 .../aurora/scheduler/storage/db/DbStorage.java  |  53 ++++++---
 .../scheduler/storage/log/LogStorage.java       |  30 ++++--
 .../scheduler/storage/mem/MemStorage.java       |  11 +-
 .../app/local/FakeNonVolatileStorage.java       |   7 ++
 .../scheduler/storage/db/DbStorageTest.java     | 108 +++++++++++++++++++
 .../scheduler/storage/log/LogStorageTest.java   |  30 ++++++
 8 files changed, 234 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
index 07d81e4..64aa10d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/CallOrderEnforcingStorage.java
@@ -116,6 +116,14 @@ public class CallOrderEnforcingStorage implements NonVolatileStorage {
   }
 
   @Override
+  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+      throws StorageException, E {
+
+    checkInState(State.PREPARED);
+    wrapped.bulkLoad(work);
+  }
+
+  @Override
   public <T, E extends Exception> T write(MutateWork<T, E> work)
       throws StorageException, E {
     checkInState(State.READY);

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
index 972a3c1..21f6a64 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/Storage.java
@@ -212,6 +212,17 @@ public interface Storage {
   <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E;
 
   /**
+   * Recovers the contents of the storage, using the provided operation. This may be done with
+   * relaxed transactional guarantees and/or rollback support.
+   *
+   * @param work Bulk load operation.
+   * @param <E> The type of exception this unit of work can throw.
+   * @throws StorageException if there was a problem reading from or writing to stable storage.
+   * @throws E bubbled transparently when the unit of work throws
+   */
+  <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work) throws StorageException, E;
+
+  /**
    * Requests the underlying storage prepare its data set; ie: initialize schemas, begin syncing
    * out of date data, etc.  This method should not block.
    *

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
index 526df10..49db52d 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/db/DbStorage.java
@@ -17,6 +17,7 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.io.CharStreams;
 import com.google.common.util.concurrent.AbstractIdleService;
 import com.google.inject.Inject;
@@ -34,9 +35,7 @@ import org.apache.aurora.scheduler.storage.Storage;
 import org.apache.aurora.scheduler.storage.TaskStore;
 import org.apache.ibatis.builder.StaticSqlSource;
 import org.apache.ibatis.exceptions.PersistenceException;
-import org.apache.ibatis.logging.LogFactory;
 import org.apache.ibatis.mapping.MappedStatement.Builder;
-import org.apache.ibatis.mapping.SqlCommandType;
 import org.apache.ibatis.session.Configuration;
 import org.apache.ibatis.session.SqlSession;
 import org.apache.ibatis.session.SqlSessionFactory;
@@ -44,6 +43,8 @@ import org.mybatis.guice.transactional.Transactional;
 
 import static java.util.Objects.requireNonNull;
 
+import static org.apache.ibatis.mapping.SqlCommandType.UPDATE;
+
 /**
  * A storage implementation backed by a relational database.
  * <p>
@@ -140,11 +141,41 @@ class DbStorage extends AbstractIdleService implements Storage {
     }
   }
 
+  @VisibleForTesting
+  static final String DISABLE_UNDO_LOG = "DISABLE_UNDO_LOG";
+  @VisibleForTesting
+  static final String ENABLE_UNDO_LOG = "ENABLE_UNDO_LOG";
+
+  // TODO(wfarner): Including @Transactional here seems to render the UNDO_LOG changes useless,
+  // resulting in no performance gain.  Figure out why.
+  @Override
+  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+      throws StorageException, E {
+
+    // Disabling the undo log disables transaction rollback, but dramatically speeds up a bulk
+    // insert.
+    try (SqlSession session = sessionFactory.openSession(false)) {
+      try {
+        session.update(DISABLE_UNDO_LOG);
+        work.apply(storeProvider);
+      } catch (PersistenceException e) {
+        throw new StorageException(e.getMessage(), e);
+      } finally {
+        session.update(ENABLE_UNDO_LOG);
+      }
+    }
+  }
+
   @Override
   public void prepare() {
     startAsync().awaitRunning();
   }
 
+  private static void addMappedStatement(Configuration configuration, String name, String sql) {
+    configuration.addMappedStatement(
+        new Builder(configuration, name, new StaticSqlSource(configuration, sql), UPDATE).build());
+  }
+
   /**
    * Creates the SQL schema during service start-up.
    * Note: This design assumes a volatile database engine.
@@ -152,22 +183,18 @@ class DbStorage extends AbstractIdleService implements Storage {
   @Override
   @Transactional
   protected void startUp() throws IOException {
-    LogFactory.useJdkLogging();
-
     Configuration configuration = sessionFactory.getConfiguration();
     String createStatementName = "create_tables";
     configuration.setMapUnderscoreToCamelCase(true);
-    configuration.addMappedStatement(new Builder(
+
+    addMappedStatement(
         configuration,
         createStatementName,
-        new StaticSqlSource(
-            configuration,
-            CharStreams.toString(
-                new InputStreamReader(
-                    DbStorage.class.getResourceAsStream("schema.sql"),
-                    StandardCharsets.UTF_8))),
-        SqlCommandType.UPDATE)
-        .build());
+        CharStreams.toString(new InputStreamReader(
+            DbStorage.class.getResourceAsStream("schema.sql"),
+            StandardCharsets.UTF_8)));
+    addMappedStatement(configuration, DISABLE_UNDO_LOG, "SET UNDO_LOG 0;");
+    addMappedStatement(configuration, ENABLE_UNDO_LOG, "SET UNDO_LOG 1;");
 
     try (SqlSession session = sessionFactory.openSession()) {
       session.update(createStatementName);

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
index 63b5b1f..bb59cdf 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogStorage.java
@@ -538,16 +538,21 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
 
   @Timed("scheduler_log_recover")
   void recover() throws RecoveryFailedException {
-    try {
-      streamManager.readFromBeginning(new Closure<LogEntry>() {
-        @Override
-        public void execute(LogEntry logEntry) {
-          replay(logEntry);
+    writeBehindStorage.bulkLoad(new MutateWork.NoResult.Quiet() {
+      @Override
+      protected void execute(MutableStoreProvider storeProvider) {
+        try {
+          streamManager.readFromBeginning(new Closure<LogEntry>() {
+            @Override
+            public void execute(LogEntry logEntry) {
+              replay(logEntry);
+            }
+          });
+        } catch (CodingException | InvalidPositionException | StreamAccessException e) {
+          throw new RecoveryFailedException(e);
         }
-      });
-    } catch (CodingException | InvalidPositionException | StreamAccessException e) {
-      throw new RecoveryFailedException(e);
-    }
+      }
+    });
   }
 
   private static final class RecoveryFailedException extends SchedulerException {
@@ -679,6 +684,13 @@ public class LogStorage implements NonVolatileStorage, DistributedSnapshotStore
   }
 
   @Override
+  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+      throws StorageException, E {
+
+    throw new UnsupportedOperationException("Log storage may not be populated in bulk.");
+  }
+
+  @Override
   public <T, E extends Exception> T read(Work<T, E> work) throws StorageException, E {
     return writeBehindStorage.read(work);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
index dafe1c4..c5ccccd 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/mem/MemStorage.java
@@ -128,8 +128,7 @@ public class MemStorage implements Storage {
 
   @Timed("mem_storage_read_operation")
   @Override
-  public <T, E extends Exception> T read(final Work<T, E> work)
-          throws StorageException, E {
+  public <T, E extends Exception> T read(final Work<T, E> work) throws StorageException, E {
     return delegatedStore.read(new Work<T, E>() {
       @Override
       public T apply(StoreProvider provider) throws E {
@@ -149,6 +148,14 @@ public class MemStorage implements Storage {
     });
   }
 
+  @Timed("mem_storage_bulk_load_operation")
+  @Override
+  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+      throws StorageException, E {
+
+    delegatedStore.bulkLoad(work);
+  }
+
   @Override
   public void prepare() throws StorageException {
     delegatedStore.prepare();

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
index 3336f8c..0768ec3 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/local/FakeNonVolatileStorage.java
@@ -51,6 +51,13 @@ class FakeNonVolatileStorage implements NonVolatileStorage {
   }
 
   @Override
+  public <E extends Exception> void bulkLoad(MutateWork.NoResult<E> work)
+      throws StorageException, E {
+
+    delegate.bulkLoad(work);
+  }
+
+  @Override
   public <T, E extends Exception> T write(MutateWork<T, E> work) throws StorageException, E {
     return delegate.write(work);
   }

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
new file mode 100644
index 0000000..743f5ba
--- /dev/null
+++ b/src/test/java/org/apache/aurora/scheduler/storage/db/DbStorageTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.aurora.scheduler.storage.db;
+
+import com.twitter.common.testing.easymock.EasyMockTest;
+
+import org.apache.aurora.scheduler.storage.AttributeStore;
+import org.apache.aurora.scheduler.storage.JobUpdateStore;
+import org.apache.aurora.scheduler.storage.LockStore;
+import org.apache.aurora.scheduler.storage.QuotaStore;
+import org.apache.aurora.scheduler.storage.SchedulerStore;
+import org.apache.aurora.scheduler.storage.Storage.MutableStoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.MutateWork;
+import org.apache.aurora.scheduler.storage.Storage.StorageException;
+import org.apache.aurora.scheduler.storage.Storage.StoreProvider;
+import org.apache.aurora.scheduler.storage.Storage.Work;
+import org.apache.ibatis.exceptions.PersistenceException;
+import org.apache.ibatis.session.SqlSession;
+import org.apache.ibatis.session.SqlSessionFactory;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.easymock.EasyMock.expect;
+import static org.junit.Assert.assertEquals;
+
+public class DbStorageTest extends EasyMockTest {
+
+  private SqlSessionFactory sessionFactory;
+  private SqlSession session;
+  private EnumValueMapper enumMapper;
+  private Work.Quiet<String> readWork;
+  private MutateWork.NoResult.Quiet writeWork;
+
+  private DbStorage storage;
+
+  @Before
+  public void setUp() {
+    sessionFactory = createMock(SqlSessionFactory.class);
+    session = createMock(SqlSession.class);
+    enumMapper = createMock(EnumValueMapper.class);
+    readWork = createMock(new Clazz<Work.Quiet<String>>() { });
+    writeWork = createMock(new Clazz<MutateWork.NoResult.Quiet>() { });
+
+    storage = new DbStorage(
+        sessionFactory,
+        enumMapper,
+        createMock(SchedulerStore.Mutable.class),
+        createMock(AttributeStore.Mutable.class),
+        createMock(LockStore.Mutable.class),
+        createMock(QuotaStore.Mutable.class),
+        createMock(JobUpdateStore.Mutable.class));
+  }
+
+  @Test(expected = StorageException.class)
+  public void testReadFails() {
+    expect(readWork.apply(EasyMock.<StoreProvider>anyObject()))
+        .andThrow(new PersistenceException());
+
+    control.replay();
+
+    storage.read(readWork);
+  }
+
+  @Test
+  public void testRead() {
+    expect(readWork.apply(EasyMock.<StoreProvider>anyObject())).andReturn("hi");
+
+    control.replay();
+
+    assertEquals("hi", storage.read(readWork));
+  }
+
+  @Test(expected = StorageException.class)
+  public void testBulkLoadFails() {
+    expect(sessionFactory.openSession(false)).andReturn(session);
+    expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andThrow(new PersistenceException());
+    expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+
+    control.replay();
+
+    storage.bulkLoad(writeWork);
+  }
+
+  @Test
+  public void testBulkLoad() {
+    expect(sessionFactory.openSession(false)).andReturn(session);
+    expect(session.update(DbStorage.DISABLE_UNDO_LOG)).andReturn(0);
+    writeWork.apply(EasyMock.<MutableStoreProvider>anyObject());
+    session.close();
+    expect(session.update(DbStorage.ENABLE_UNDO_LOG)).andReturn(0);
+
+    control.replay();
+
+    storage.bulkLoad(writeWork);
+  }
+}

http://git-wip-us.apache.org/repos/asf/aurora/blob/ac856248/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
index cb6ba25..cbc2d38 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogStorageTest.java
@@ -109,6 +109,7 @@ import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher;
 import org.apache.aurora.scheduler.storage.log.testing.LogOpMatcher.StreamMatcher;
 import org.apache.aurora.scheduler.storage.testing.StorageTestUtil;
 import org.easymock.Capture;
+import org.easymock.EasyMock;
 import org.easymock.IAnswer;
 import org.junit.Before;
 import org.junit.Test;
@@ -422,6 +423,15 @@ public class LogStorageTest extends EasyMockTest {
       expect(entry.contents()).andReturn(ThriftBinaryCodec.encodeNonNull(logEntry));
     }
 
+    storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject());
+    expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() {
+      @Override
+      public NoResult<?> answer() throws Throwable {
+        MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0];
+        work.apply(storageUtil.mutableStoreProvider);
+        return null;
+      }
+    });
     expect(stream.readAll()).andReturn(entryBuilder.build().iterator());
   }
 
@@ -467,6 +477,15 @@ public class LogStorageTest extends EasyMockTest {
         }
       });
 
+      storageUtil.storage.bulkLoad(EasyMock.<MutateWork.NoResult<?>>anyObject());
+      expectLastCall().andAnswer(new IAnswer<MutateWork.NoResult<?>>() {
+        @Override
+        public NoResult<?> answer() throws Throwable {
+          MutateWork.NoResult work = (MutateWork.NoResult<?>) EasyMock.getCurrentArguments()[0];
+          work.apply(storageUtil.mutableStoreProvider);
+          return null;
+        }
+      });
       expect(stream.readAll()).andReturn(Iterators.<Entry>emptyIterator());
       final Capture<MutateWork<Void, RuntimeException>> recoveryWork = createCapture();
       expect(storageUtil.storage.write(capture(recoveryWork))).andAnswer(
@@ -1033,6 +1052,17 @@ public class LogStorageTest extends EasyMockTest {
     }.run();
   }
 
+  @Test(expected = UnsupportedOperationException.class)
+  public void testBulkLoad() throws Exception {
+    expect(log.open()).andReturn(stream);
+    MutateWork.NoResult.Quiet load = createMock(new Clazz<NoResult.Quiet>() { });
+
+    control.replay();
+
+    logStorage.prepare();
+    logStorage.bulkLoad(load);
+  }
+
   private LogEntry createTransaction(Op... ops) {
     return LogEntry.transaction(
         new Transaction(ImmutableList.copyOf(ops), storageConstants.CURRENT_SCHEMA_VERSION));