You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by wf...@apache.org on 2017/12/03 14:50:49 UTC

aurora git commit: Remove redundant transaction recorder

Repository: aurora
Updated Branches:
  refs/heads/master cea43db9d -> 5e008ff7f


Remove redundant transaction recorder

Reviewed at https://reviews.apache.org/r/64283/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/5e008ff7
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/5e008ff7
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/5e008ff7

Branch: refs/heads/master
Commit: 5e008ff7fe7f5701e1baf2051c2873f655ca7aed
Parents: cea43db
Author: Bill Farner <wf...@apache.org>
Authored: Sun Dec 3 06:50:45 2017 -0800
Committer: Bill Farner <wf...@apache.org>
Committed: Sun Dec 3 06:50:45 2017 -0800

----------------------------------------------------------------------
 .../scheduler/storage/log/LogPersistence.java   |   5 +-
 .../scheduler/storage/log/StreamManager.java    |   9 +-
 .../storage/log/StreamManagerImpl.java          | 133 ++---------------
 .../storage/log/StreamTransaction.java          |  40 -----
 .../scheduler/storage/log/LogManagerTest.java   | 149 +------------------
 5 files changed, 21 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
index a0a6b6c..e70e605 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/LogPersistence.java
@@ -18,6 +18,7 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
@@ -99,10 +100,8 @@ class LogPersistence implements Persistence, DistributedSnapshotStore {
 
   @Override
   public void persist(Stream<Op> mutations) throws PersistenceException {
-    StreamTransaction transaction = streamManager.startTransaction();
-    mutations.forEach(transaction::add);
     try {
-      transaction.commit();
+      streamManager.commit(mutations.collect(Collectors.toList()));
     } catch (CodingException e) {
       throw new PersistenceException(e);
     }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
index 18da32d..73602cb 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManager.java
@@ -14,8 +14,10 @@
 package org.apache.aurora.scheduler.storage.log;
 
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.aurora.gen.storage.LogEntry;
+import org.apache.aurora.gen.storage.Op;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.scheduler.log.Log;
 
@@ -52,12 +54,11 @@ public interface StreamManager {
   void truncateBefore(Log.Position position);
 
   /**
-   * Starts a transaction that can be used to commit a series of ops to the log stream atomically.
+   * Saves operations to the log stream.
    *
-   * @return StreamTransaction A transaction manager to handle batching up commits to the
-   *    underlying stream.
+   * @param mutations Operations to save.
    */
-  StreamTransaction startTransaction();
+  void commit(List<Op> mutations);
 
   /**
    * Adds a snapshot to the log and if successful, truncates the log entries preceding the

http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index c5b107f..9eb309a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -15,33 +15,24 @@ package org.apache.aurora.scheduler.storage.log;
 
 import java.util.Arrays;
 import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 import javax.inject.Inject;
 
-import com.google.common.base.Preconditions;
 import com.google.common.collect.AbstractIterator;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Maps;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.primitives.Bytes;
 import com.google.inject.assistedinject.Assisted;
 
 import org.apache.aurora.common.stats.Stats;
-import org.apache.aurora.gen.ScheduledTask;
 import org.apache.aurora.gen.storage.Frame;
 import org.apache.aurora.gen.storage.FrameHeader;
 import org.apache.aurora.gen.storage.LogEntry;
 import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.gen.storage.RemoveTasks;
-import org.apache.aurora.gen.storage.SaveHostAttributes;
-import org.apache.aurora.gen.storage.SaveTasks;
 import org.apache.aurora.gen.storage.Snapshot;
 import org.apache.aurora.gen.storage.Transaction;
 import org.apache.aurora.gen.storage.storageConstants;
@@ -193,8 +184,16 @@ class StreamManagerImpl implements StreamManager {
   }
 
   @Override
-  public StreamTransactionImpl startTransaction() {
-    return new StreamTransactionImpl();
+  public void commit(List<Op> mutations) {
+    if (mutations.isEmpty()) {
+      return;
+    }
+
+    Transaction transaction = new Transaction()
+        .setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION)
+        .setOps(mutations);
+    appendAndGetPosition(LogEntry.transaction(transaction));
+    vars.unSnapshottedTransactions.incrementAndGet();
   }
 
   @Override
@@ -235,114 +234,4 @@ class StreamManagerImpl implements StreamManager {
     vars.entriesWritten.incrementAndGet();
     return firstPosition;
   }
-
-  final class StreamTransactionImpl implements StreamTransaction {
-    private final Transaction transaction =
-        new Transaction().setSchemaVersion(storageConstants.CURRENT_SCHEMA_VERSION);
-    private final AtomicBoolean committed = new AtomicBoolean(false);
-
-    StreamTransactionImpl() {
-      // supplied by factory method
-    }
-
-    @Override
-    public Log.Position commit() throws CodingException {
-      Preconditions.checkState(!committed.getAndSet(true),
-          "Can only call commit once per transaction.");
-
-      if (!transaction.isSetOps()) {
-        return null;
-      }
-
-      Log.Position position = appendAndGetPosition(LogEntry.transaction(transaction));
-      vars.unSnapshottedTransactions.incrementAndGet();
-      return position;
-    }
-
-    @Override
-    public void add(Op op) {
-      Preconditions.checkState(!committed.get());
-
-      Op prior = transaction.isSetOps() ? Iterables.getLast(transaction.getOps(), null) : null;
-      if (prior == null || !coalesce(prior, op)) {
-        transaction.addToOps(op);
-      }
-    }
-
-    /**
-     * Tries to coalesce a new op into the prior to compact the binary representation and increase
-     * batching.
-     *
-     * @param prior The previous op.
-     * @param next The next op to be added.
-     * @return {@code true} if the next op was coalesced into the prior, {@code false} otherwise.
-     */
-    private boolean coalesce(Op prior, Op next) {
-      if (!prior.isSet() && !next.isSet()) {
-        return false;
-      }
-
-      Op._Fields priorType = prior.getSetField();
-      if (!priorType.equals(next.getSetField())) {
-        return false;
-      }
-
-      switch (priorType) {
-        case SAVE_FRAMEWORK_ID:
-          prior.setSaveFrameworkId(next.getSaveFrameworkId());
-          return true;
-        case SAVE_TASKS:
-          coalesce(prior.getSaveTasks(), next.getSaveTasks());
-          return true;
-        case REMOVE_TASKS:
-          coalesce(prior.getRemoveTasks(), next.getRemoveTasks());
-          return true;
-        case SAVE_HOST_ATTRIBUTES:
-          return coalesce(prior.getSaveHostAttributes(), next.getSaveHostAttributes());
-        default:
-          return false;
-      }
-    }
-
-    private void coalesce(SaveTasks prior, SaveTasks next) {
-      if (next.isSetTasks()) {
-        if (prior.isSetTasks()) {
-          // It is an expected invariant that an operation may reference a task (identified by
-          // task ID) no more than one time.  Therefore, to coalesce two SaveTasks operations,
-          // the most recent task definition overrides the prior operation.
-          Map<String, ScheduledTask> coalesced = Maps.newHashMap();
-          for (ScheduledTask task : prior.getTasks()) {
-            coalesced.put(task.getAssignedTask().getTaskId(), task);
-          }
-          for (ScheduledTask task : next.getTasks()) {
-            coalesced.put(task.getAssignedTask().getTaskId(), task);
-          }
-          prior.setTasks(ImmutableSet.copyOf(coalesced.values()));
-        } else {
-          prior.setTasks(next.getTasks());
-        }
-      }
-    }
-
-    private void coalesce(RemoveTasks prior, RemoveTasks next) {
-      if (next.isSetTaskIds()) {
-        if (prior.isSetTaskIds()) {
-          prior.setTaskIds(ImmutableSet.<String>builder()
-              .addAll(prior.getTaskIds())
-              .addAll(next.getTaskIds())
-              .build());
-        } else {
-          prior.setTaskIds(next.getTaskIds());
-        }
-      }
-    }
-
-    private boolean coalesce(SaveHostAttributes prior, SaveHostAttributes next) {
-      if (prior.getHostAttributes().getHost().equals(next.getHostAttributes().getHost())) {
-        prior.getHostAttributes().setAttributes(next.getHostAttributes().getAttributes());
-        return true;
-      }
-      return false;
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
deleted file mode 100644
index a51fd18..0000000
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamTransaction.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.aurora.scheduler.storage.log;
-
-import org.apache.aurora.codec.ThriftBinaryCodec;
-import org.apache.aurora.gen.storage.Op;
-import org.apache.aurora.scheduler.log.Log;
-
-/**
- * Manages a single log stream append transaction.  Local storage ops can be added to the
- * transaction and then later committed as an atomic unit.
- */
-interface StreamTransaction {
-  /**
-   * Appends any ops that have been added to this transaction to the log stream in a single
-   * atomic record.
-   *
-   * @return The position of the log entry committed in this transaction, if any.
-   * @throws CodingException If there was a problem encoding a log entry for commit.
-   */
-  Log.Position commit() throws ThriftBinaryCodec.CodingException;
-
-  /**
-   * Adds a local storage operation to this transaction.
-   *
-   * @param op The local storage op to add.
-   */
-  void add(Op op);
-}

http://git-wip-us.apache.org/repos/asf/aurora/blob/5e008ff7/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
index cb38f10..4d210b2 100644
--- a/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
+++ b/src/test/java/org/apache/aurora/scheduler/storage/log/LogManagerTest.java
@@ -16,13 +16,8 @@ package org.apache.aurora.scheduler.storage.log;
 import java.nio.ByteBuffer;
 import java.security.MessageDigest;
 import java.util.Collections;
-import java.util.Deque;
-import java.util.Iterator;
 import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingDeque;
 
-import com.google.common.base.Function;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
@@ -61,25 +56,13 @@ import org.junit.Test;
 
 import static org.apache.aurora.scheduler.storage.log.SnapshotDeduplicator.SnapshotDeduplicatorImpl;
 import static org.easymock.EasyMock.expect;
-import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotSame;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertSame;
 
 public class LogManagerTest extends EasyMockTest {
 
   private static final Amount<Integer, Data> NO_FRAMES_EVER_SIZE =
       Amount.of(Integer.MAX_VALUE, Data.GB);
 
-  private static final Function<LogEntry, byte[]> ENCODER = entry -> {
-    try {
-      return encode(entry);
-    } catch (CodingException e) {
-      throw new RuntimeException(e);
-    }
-  };
-
   private Stream stream;
   private Position position1;
   private Position position2;
@@ -139,41 +122,10 @@ public class LogManagerTest extends EasyMockTest {
   }
 
   @Test
-  public void testStreamManagerSuccessiveCommits() throws CodingException {
-    control.replay();
-
-    StreamManager streamManager = createNoMessagesStreamManager();
-    StreamTransaction streamTransaction = streamManager.startTransaction();
-    streamTransaction.commit();
-
-    assertNotSame("Expected a new transaction to be started after a commit",
-        streamTransaction, streamManager.startTransaction());
-  }
-
-  @Test
   public void testTransactionEmpty() throws CodingException {
     control.replay();
 
-    Position position = createNoMessagesStreamManager().startTransaction().commit();
-    assertNull(position);
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testTransactionDoubleCommit() throws CodingException {
-    control.replay();
-
-    StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-    streamTransaction.commit();
-    streamTransaction.commit();
-  }
-
-  @Test(expected = IllegalStateException.class)
-  public void testTransactionAddAfterCommit() throws CodingException {
-    control.replay();
-
-    StreamTransaction streamTransaction = createNoMessagesStreamManager().startTransaction();
-    streamTransaction.commit();
-    streamTransaction.add(Op.saveFrameworkId(new SaveFrameworkId("don't allow this")));
+    createNoMessagesStreamManager().commit(ImmutableList.of());
   }
 
   private static class LogEntryMatcher implements IArgumentMatcher {
@@ -228,12 +180,7 @@ public class LogManagerTest extends EasyMockTest {
     StreamManager streamManager = createNoMessagesStreamManager();
     control.replay();
 
-    StreamTransaction transaction = streamManager.startTransaction();
-    transaction.add(saveFrameworkId);
-    transaction.add(deleteJob);
-
-    Position position = transaction.commit();
-    assertSame(position1, position);
+    streamManager.commit(ImmutableList.of(saveFrameworkId, deleteJob));
   }
 
   static class Message {
@@ -281,97 +228,7 @@ public class LogManagerTest extends EasyMockTest {
     StreamManager streamManager = createStreamManager(message.chunkSize);
     control.replay();
 
-    StreamTransaction transaction = streamManager.startTransaction();
-    transaction.add(saveFrameworkId);
-
-    Position position = transaction.commit();
-    assertSame(position1, position);
-  }
-
-  @Test
-  public void testConcurrentWrites() throws Exception {
-    control.replay(); // No easymock expectations used here
-
-    Op op1 = Op.removeJob(new RemoveJob(JobKeys.from("r1", "env", "name").newBuilder()));
-    final Op op2 = Op.removeJob(new RemoveJob(JobKeys.from("r2", "env", "name").newBuilder()));
-
-    LogEntry transaction1 = createLogEntry(op1);
-    LogEntry transaction2 = createLogEntry(op2);
-
-    final CountDownLatch message1Started = new CountDownLatch(1);
-
-    Message message1 = frame(transaction1);
-    Message message2 = frame(transaction2);
-
-    List<byte[]> expectedAppends =
-        ImmutableList.<byte[]>builder()
-            .add(encode(message1.header))
-            .addAll(Iterables.transform(message1.chunks, ENCODER))
-            .add(encode(message2.header))
-            .addAll(Iterables.transform(message2.chunks, ENCODER))
-            .build();
-
-    final Deque<byte[]> actualAppends = new LinkedBlockingDeque<>();
-
-    Stream mockStream = new Stream() {
-      @Override
-      public Position append(byte[] contents) throws StreamAccessException {
-        actualAppends.addLast(contents);
-        message1Started.countDown();
-        try {
-          // If a chunked message is not properly serialized to the log, this sleep all but ensures
-          // interleaved chunk writes and a test failure.
-          Thread.sleep(100);
-        } catch (InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-        return null;
-      }
-
-      @Override
-      public Iterator<Entry> readAll() throws InvalidPositionException, StreamAccessException {
-        throw new UnsupportedOperationException();
-      }
-
-      @Override
-      public void truncateBefore(Position position)
-          throws InvalidPositionException, StreamAccessException {
-        throw new UnsupportedOperationException();
-      }
-    };
-
-    final StreamManagerImpl streamManager = new StreamManagerImpl(
-        mockStream,
-        new EntrySerializer.EntrySerializerImpl(message1.chunkSize, Hashing.md5()),
-        Hashing.md5(),
-        new SnapshotDeduplicatorImpl());
-    StreamTransaction tr1 = streamManager.startTransaction();
-    tr1.add(op1);
-
-    Thread snapshotThread = new Thread() {
-      @Override
-      public void run() {
-        StreamTransaction tr2 = streamManager.startTransaction();
-        tr2.add(op2);
-        try {
-          message1Started.await();
-          tr2.commit();
-        } catch (CodingException | InterruptedException e) {
-          throw new RuntimeException(e);
-        }
-      }
-    };
-    snapshotThread.setDaemon(true);
-    snapshotThread.start();
-
-    tr1.commit();
-
-    snapshotThread.join();
-
-    assertEquals(expectedAppends.size(), actualAppends.size());
-    for (byte[] expectedData : expectedAppends) {
-      assertArrayEquals(expectedData, actualAppends.removeFirst());
-    }
+    streamManager.commit(ImmutableList.of(saveFrameworkId));
   }
 
   @Test