You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/11/07 21:16:58 UTC

apex-malhar git commit: APEXMALHAR-2320 #resolve #comment use SerializationBuffer implement FSWindowDataManager.toSlice()

Repository: apex-malhar
Updated Branches:
  refs/heads/master 3f973043f -> 8e3a0c0b0


APEXMALHAR-2320 #resolve #comment use SerializationBuffer implement FSWindowDataManager.toSlice()


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e3a0c0b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e3a0c0b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e3a0c0b

Branch: refs/heads/master
Commit: 8e3a0c0b09ff2b95d7792e69b573d85a29515cb3
Parents: 3f97304
Author: brightchen <br...@datatorrent.com>
Authored: Mon Oct 31 16:50:07 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Nov 7 13:03:07 2016 -0800

----------------------------------------------------------------------
 .../malhar/lib/wal/FSWindowDataManager.java     | 26 +++++++++++++-------
 1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e3a0c0b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 6e8774e..4db5e17 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -18,7 +18,6 @@
  */
 package org.apache.apex.malhar.lib.wal;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
 import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
@@ -45,7 +46,6 @@ import org.apache.hadoop.fs.RemoteIterator;
 
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableSet;
@@ -149,6 +149,8 @@ public class FSWindowDataManager implements WindowDataManager
 
   private transient FileContext fileContext;
 
+  private transient SerializationBuffer serializationBuffer;
+
   public FSWindowDataManager()
   {
     kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
@@ -157,6 +159,7 @@ public class FSWindowDataManager implements WindowDataManager
   @Override
   public void setup(Context.OperatorContext context)
   {
+    serializationBuffer = new SerializationBuffer(new WindowedBlockStream());
     operatorId = context.getId();
 
     if (isStatePathRelativeToAppPath) {
@@ -416,7 +419,17 @@ public class FSWindowDataManager implements WindowDataManager
 
     byte[] windowIdBytes = Longs.toByteArray(windowId);
     writer.append(new Slice(windowIdBytes));
+
+    /**
+     * writer.append() will copy the data to the file output stream.
+     * So the data in the buffer is not needed any more, and it is safe to reset the serializationBuffer.
+     *
+     * And as the data in stream memory can be cleaned all at once. So don't need to separate data by different windows,
+     * so beginWindow() and endWindow() don't need to be called
+     */
     writer.append(toSlice(object));
+    serializationBuffer.reset();
+
     wal.beforeCheckpoint(windowId);
     wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum());
     writer.rotateIfNecessary();
@@ -594,13 +607,8 @@ public class FSWindowDataManager implements WindowDataManager
 
   private Slice toSlice(Object object)
   {
-    ByteArrayOutputStream baos = new ByteArrayOutputStream();
-    Output output = new Output(baos);
-    kryo.writeClassAndObject(output, object);
-    output.close();
-    byte[] bytes = baos.toByteArray();
-
-    return new Slice(bytes);
+    kryo.writeClassAndObject(serializationBuffer, object);
+    return serializationBuffer.toSlice();
   }
 
   protected Object fromSlice(Slice slice)