You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@aurora.apache.org by ke...@apache.org on 2014/11/21 22:16:14 UTC

incubator-aurora git commit: Return an iterable of frame chunks.

Repository: incubator-aurora
Updated Branches:
  refs/heads/master 6f92724fd -> 91accd62f


Return an iterable of frame chunks.

Stream frame chunks instead of preallocating them.

This avoids allocating an entire additional snapshot worth
of heap during entry serialization, reducing overall heap
impact of the serializer from 2*sizeof(serialized-entry) to
sizeof(serialized-entry)+chunkSize.

Further optimizations out-of-scope for this change:

* Make the returned iterator mutate a fixed-size buffer (for GC
  pressure avoidance).
* Change the log format so that FrameHeader doesn't need to know the
  size and checksum of the serialized data ahead-of-time (maybe write
  it as a trailer).

Testing Done:
./gradlew -Pq build

Bugs closed: AURORA-930

Reviewed at https://reviews.apache.org/r/28306/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/91accd62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/91accd62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/91accd62

Branch: refs/heads/master
Commit: 91accd62f81b823aeeea35bc4b600101d8edbe8b
Parents: 6f92724
Author: Kevin Sweeney <ke...@apache.org>
Authored: Fri Nov 21 13:16:06 2014 -0800
Committer: Kevin Sweeney <ke...@apache.org>
Committed: Fri Nov 21 13:16:06 2014 -0800

----------------------------------------------------------------------
 .../scheduler/storage/log/EntrySerializer.java  | 63 +++++++++++++++-----
 .../storage/log/StreamManagerImpl.java          |  2 +-
 .../aurora/scheduler/app/SchedulerIT.java       |  2 +-
 3 files changed, 50 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91accd62/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
index f4fa1cb..40c510a 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/EntrySerializer.java
@@ -14,10 +14,14 @@
 package org.apache.aurora.scheduler.storage.log;
 
 import java.nio.ByteBuffer;
+import java.util.Iterator;
 
 import javax.inject.Inject;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Throwables;
+import com.google.common.collect.AbstractIterator;
+import com.google.common.collect.ImmutableList;
 import com.google.common.hash.HashFunction;
 import com.twitter.common.inject.TimedInterceptor.Timed;
 import com.twitter.common.quantity.Amount;
@@ -39,13 +43,14 @@ import static org.apache.aurora.scheduler.storage.log.LogManager.MaxEntrySize;
  */
 public interface EntrySerializer {
   /**
-   * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}.
+   * Serializes a log entry and splits it into chunks no larger than {@code maxEntrySizeBytes}. The
+   * returned iterable's iterator is not thread-safe.
    *
    * @param logEntry The log entry to serialize.
    * @return Serialized and chunked log entry.
    * @throws CodingException If the entry could not be serialized.
    */
-  byte[][] serialize(LogEntry logEntry) throws CodingException;
+  Iterable<byte[]> serialize(LogEntry logEntry) throws CodingException;
 
   @VisibleForTesting
   class EntrySerializerImpl implements EntrySerializer {
@@ -64,23 +69,51 @@ public interface EntrySerializer {
 
     @Override
     @Timed("log_entry_serialize")
-    public byte[][] serialize(LogEntry logEntry) throws CodingException {
-      byte[] entry = Entries.thriftBinaryEncode(logEntry);
+    public Iterable<byte[]> serialize(LogEntry logEntry) throws CodingException {
+      final byte[] entry = Entries.thriftBinaryEncode(logEntry);
       if (entry.length <= maxEntrySizeBytes) {
-        return new byte[][] {entry};
+        return ImmutableList.of(entry);
       }
 
-      int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
-      byte[][] frames = new byte[chunks + 1][];
+      final int chunks = (int) Math.ceil(entry.length / (double) maxEntrySizeBytes);
 
-      frames[0] = encode(Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
-      for (int i = 0; i < chunks; i++) {
-        int offset = i * maxEntrySizeBytes;
-        ByteBuffer chunk =
-            ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
-        frames[i + 1] = encode(Frame.chunk(new FrameChunk(chunk)));
-      }
-      return frames;
+      final byte[] header = encode(
+          Frame.header(new FrameHeader(chunks, ByteBuffer.wrap(checksum(entry)))));
+
+      return new Iterable<byte[]>() {
+        @Override
+        public Iterator<byte[]> iterator() {
+          return streamFrames(header, chunks, entry);
+        }
+      };
+    }
+
+    Iterator<byte[]> streamFrames(final byte[] header, final int chunks, final byte[] entry) {
+      return new AbstractIterator<byte[]>() {
+        private int i = -1;
+
+        @Override
+        protected byte[] computeNext() {
+          byte[] result;
+          if (i == -1) {
+            result = header;
+          } else if (i < chunks) {
+            int offset = i * maxEntrySizeBytes;
+            ByteBuffer chunk =
+                ByteBuffer.wrap(entry, offset, Math.min(maxEntrySizeBytes, entry.length - offset));
+            try {
+              result = encode(Frame.chunk(new FrameChunk(chunk)));
+            } catch (CodingException e) {
+              throw Throwables.propagate(e);
+            }
+          } else {
+            return endOfData();
+          }
+
+          i++;
+          return result;
+        };
+      };
     }
 
     @Timed("log_entry_checksum")

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91accd62/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
----------------------------------------------------------------------
diff --git a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
index cb95d89..ee88d7c 100644
--- a/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
+++ b/src/main/java/org/apache/aurora/scheduler/storage/log/StreamManagerImpl.java
@@ -233,7 +233,7 @@ class StreamManagerImpl implements StreamManager {
   @Timed("log_manager_append")
   protected Log.Position appendAndGetPosition(LogEntry logEntry) throws CodingException {
     Log.Position firstPosition = null;
-    byte[][] entries = entrySerializer.serialize(logEntry);
+    Iterable<byte[]> entries = entrySerializer.serialize(logEntry);
     synchronized (writeMutex) { // ensure all sub-entries are written as a unit
       for (byte[] entry : entries) {
         Log.Position position = stream.append(entry);

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/91accd62/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
----------------------------------------------------------------------
diff --git a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
index c903894..bf6cfad 100644
--- a/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
+++ b/src/test/java/org/apache/aurora/scheduler/app/SchedulerIT.java
@@ -285,7 +285,7 @@ public class SchedulerIT extends BaseZooKeeperTest {
               @Override
               public byte[] contents() {
                 try {
-                  return entrySerializer.serialize(entry)[0];
+                  return Iterables.getFirst(entrySerializer.serialize(entry), null);
                 } catch (CodingException e) {
                   throw Throwables.propagate(e);
                 }