You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/11/07 21:16:58 UTC
apex-malhar git commit: APEXMALHAR-2320 #resolve #comment use
SerializationBuffer implement FSWindowDataManager.toSlice()
Repository: apex-malhar
Updated Branches:
refs/heads/master 3f973043f -> 8e3a0c0b0
APEXMALHAR-2320 #resolve #comment use SerializationBuffer implement FSWindowDataManager.toSlice()
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/8e3a0c0b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/8e3a0c0b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/8e3a0c0b
Branch: refs/heads/master
Commit: 8e3a0c0b09ff2b95d7792e69b573d85a29515cb3
Parents: 3f97304
Author: brightchen <br...@datatorrent.com>
Authored: Mon Oct 31 16:50:07 2016 -0700
Committer: brightchen <br...@datatorrent.com>
Committed: Mon Nov 7 13:03:07 2016 -0800
----------------------------------------------------------------------
.../malhar/lib/wal/FSWindowDataManager.java | 26 +++++++++++++-------
1 file changed, 17 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8e3a0c0b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
index 6e8774e..4db5e17 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/wal/FSWindowDataManager.java
@@ -18,7 +18,6 @@
*/
package org.apache.apex.malhar.lib.wal;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -38,6 +37,8 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.malhar.lib.state.managed.IncrementalCheckpointManager;
import org.apache.apex.malhar.lib.utils.FileContextUtils;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
@@ -45,7 +46,6 @@ import org.apache.hadoop.fs.RemoteIterator;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -149,6 +149,8 @@ public class FSWindowDataManager implements WindowDataManager
private transient FileContext fileContext;
+ private transient SerializationBuffer serializationBuffer;
+
public FSWindowDataManager()
{
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
@@ -157,6 +159,7 @@ public class FSWindowDataManager implements WindowDataManager
@Override
public void setup(Context.OperatorContext context)
{
+ serializationBuffer = new SerializationBuffer(new WindowedBlockStream());
operatorId = context.getId();
if (isStatePathRelativeToAppPath) {
@@ -416,7 +419,17 @@ public class FSWindowDataManager implements WindowDataManager
byte[] windowIdBytes = Longs.toByteArray(windowId);
writer.append(new Slice(windowIdBytes));
+
+ /**
+ * writer.append() will copy the data to the file output stream.
+ * So the data in the buffer is not needed any more, and it is safe to reset the serializationBuffer.
+ *
+ * And as the data in stream memory can be cleaned all at once. So don't need to separate data by different windows,
+ * so beginWindow() and endWindow() don't need to be called
+ */
writer.append(toSlice(object));
+ serializationBuffer.reset();
+
wal.beforeCheckpoint(windowId);
wal.windowWalParts.put(windowId, writer.getCurrentPointer().getPartNum());
writer.rotateIfNecessary();
@@ -594,13 +607,8 @@ public class FSWindowDataManager implements WindowDataManager
private Slice toSlice(Object object)
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- Output output = new Output(baos);
- kryo.writeClassAndObject(output, object);
- output.close();
- byte[] bytes = baos.toByteArray();
-
- return new Slice(bytes);
+ kryo.writeClassAndObject(serializationBuffer, object);
+ return serializationBuffer.toSlice();
}
protected Object fromSlice(Slice slice)