You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/24 20:46:36 UTC
[2/3] apex-malhar git commit: APEXMALHAR-2190 Use reusable buffer for
serialization in spillable data structures closes #404
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
new file mode 100644
index 0000000..57638d8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * All spillable data structures use this class to manage the buffers for serialization.
+ * This class contains serialization logic that is common for all spillable data structures
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class AffixKeyValueSerdeManager<K, V> extends KeyValueSerdeManager<K, V>
+{
+ /**
+ * The read buffer will be released when read is done, while write buffer should be held until the data has been persisted.
+ * The write buffer should be non-transient. The data which has been already saved to files will be removed by {@link Bucket}
+ * while the data which haven't been saved need to be recovered by the platform from checkpoint.
+ */
+ private AffixSerde<K> metaKeySerde;
+ private AffixSerde<K> dataKeySerde;
+
+
+ private AffixKeyValueSerdeManager()
+ {
+ //for kyro
+ }
+
+ public AffixKeyValueSerdeManager(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ metaKeySerde = new AffixSerde<K>(null, keySerde, metaKeySuffix);
+ dataKeySerde = new AffixSerde<K>(dataKeyIdentifier, keySerde, null);
+ }
+
+ public Slice serializeMetaKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ metaKeySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+ public Slice serializeDataKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ dataKeySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+ public V deserializeValue(Input input)
+ {
+ V value = valueSerde.deserialize(input);
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
new file mode 100644
index 0000000..7504633
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * AffixSerde provides serde for adding prefix or suffix
+ *
+ * @param <T>
+ */
+public class AffixSerde<T> implements Serde<T>
+{
+ private Serde<T> serde;
+ private byte[] prefix;
+ private byte[] suffix;
+
+ private AffixSerde()
+ {
+ //kyro
+ }
+
+ public AffixSerde(byte[] prefix, Serde<T> serde, byte[] suffix)
+ {
+ this.prefix = prefix;
+ this.suffix = suffix;
+ this.serde = serde;
+ }
+
+ @Override
+ public void serialize(T object, Output output)
+ {
+ if (prefix != null && prefix.length > 0) {
+ output.write(prefix);
+ }
+ serde.serialize(object, output);
+ if (suffix != null && suffix.length > 0) {
+ output.write(suffix);
+ }
+ }
+
+ @Override
+ public T deserialize(Input input)
+ {
+ if (prefix != null && prefix.length > 0) {
+ input.skip(prefix.length);
+ }
+ return serde.deserialize(input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
new file mode 100644
index 0000000..4b2a45b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.lang.reflect.Array;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+public class ArraySerde<T> implements Serde<T[]>
+{
+ private Serde<T> itemSerde;
+ private Class<T> itemType;
+
+ private ArraySerde()
+ {
+ }
+
+ /**
+ * Serializer and Deserializer need different constructor, so use static factory method to wrap.
+ * The ArraySerde returned by newSerializer can only used for serialization
+ */
+ public static <T> ArraySerde<T> newSerializer(Serde<T> itemSerde)
+ {
+ return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde));
+ }
+
+ public static <T> ArraySerde<T> newSerde(Serde<T> itemSerde, Class<T> itemType)
+ {
+ return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde), Preconditions.checkNotNull(itemType));
+ }
+
+ private ArraySerde(Serde<T> itemSerde)
+ {
+ this.itemSerde = itemSerde;
+ }
+
+ private ArraySerde(Serde<T> itemSerde, Class<T> itemType)
+ {
+ this.itemSerde = itemSerde;
+ this.itemType = itemType;
+ }
+
+ @Override
+ public void serialize(T[] objects, Output output)
+ {
+ if (objects.length == 0) {
+ return;
+ }
+ output.writeInt(objects.length, true);
+ Serde<T> serializer = getItemSerde();
+ for (T object : objects) {
+ serializer.serialize(object, output);
+ }
+ }
+
+ protected Serde<T> getItemSerde()
+ {
+ return itemSerde;
+ }
+
+ @Override
+ public T[] deserialize(Input input)
+ {
+ int numOfElements = input.readInt(true);
+
+ T[] array = createObjectArray(numOfElements);
+
+ for (int index = 0; index < numOfElements; ++index) {
+ array[index] = getItemSerde().deserialize(input);
+ }
+ return array;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected T[] createObjectArray(int length)
+ {
+ return (T[])Array.newInstance(itemType, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
new file mode 100644
index 0000000..c140962
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * keep the information of one block
+ *
+ */
+public class Block
+{
+ public static class OutOfBlockBufferMemoryException extends RuntimeException
+ {
+ private static final long serialVersionUID = 3813792889200989131L;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(Block.class);
+
+ public static final int DEFAULT_BLOCK_SIZE = 100000;
+
+ //the capacity of the block
+ private int capacity;
+
+ /*
+ * the size of the data.
+ */
+ private volatile int size;
+
+ private int objectBeginOffset = 0;
+ private byte[] buffer;
+
+ /**
+ * whether any slices have been exposed to the caller.
+ */
+ private boolean exposedSlices;
+
+ private Block()
+ {
+ this(DEFAULT_BLOCK_SIZE);
+ }
+
+ public Block(int capacity)
+ {
+ if (capacity <= 0) {
+ throw new IllegalArgumentException("Invalid capacity: " + capacity);
+ }
+ buffer = new byte[capacity];
+ this.capacity = capacity;
+ }
+
+ public void write(byte data)
+ {
+ checkOrReallocateBuffer(1);
+ buffer[size++] = data;
+ }
+
+ public void write(byte[] data)
+ {
+ write(data, 0, data.length);
+ }
+
+ public void write(byte[] data, final int offset, final int length)
+ {
+ checkOrReallocateBuffer(length);
+
+ System.arraycopy(data, offset, buffer, size, length);
+ size += length;
+ }
+
+
+
+ /**
+ * check the buffer size and reallocate if buffer is not enough
+ *
+ * @param length
+ */
+ private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException
+ {
+ if (size + length <= capacity) {
+ return;
+ }
+
+ if (exposedSlices) {
+ throw new OutOfBlockBufferMemoryException();
+ }
+
+ //calculate the new capacity
+ capacity = (size + length) * 2;
+
+ byte[] oldBuffer = buffer;
+ buffer = new byte[capacity];
+
+ /**
+ * no slices are exposed in this block yet (this is the first object in this block).
+ * so we can reallocate and move the memory
+ */
+ if (size > 0) {
+ System.arraycopy(oldBuffer, 0, buffer, 0, size);
+ }
+ }
+
+ /**
+ * Similar to toSlice, this method is used to get the information of the
+ * object regards the data already write to buffer. But unlike toSlice() which
+ * indicates all the writes of this object are already done, this method can be called at
+ * any time
+ */
+ public Slice getLastObjectSlice()
+ {
+ return new Slice(buffer, objectBeginOffset, size - objectBeginOffset);
+ }
+
+ public void discardLastObjectData()
+ {
+ if (objectBeginOffset == 0) {
+ return;
+ }
+ size = objectBeginOffset;
+ }
+
+ public void moveLastObjectDataTo(Block newBlock)
+ {
+ if (size > objectBeginOffset) {
+ newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset);
+ discardLastObjectData();
+ }
+ }
+
+ /**
+ * This method returns the slice that represents the serialized form.
+ * The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call.
+ * A call to toSlice indicates the writes are done for this object
+ *
+ * @return
+ */
+ public BufferSlice toSlice()
+ {
+ if (size == objectBeginOffset) {
+ throw new RuntimeException("data size is zero.");
+ }
+ BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset);
+ //prepare for next object
+ objectBeginOffset = size;
+ exposedSlices = true;
+ return slice;
+ }
+
+ public void reset()
+ {
+ size = 0;
+ objectBeginOffset = 0;
+ exposedSlices = false;
+ }
+
+ /**
+ * check if the block has enough space for the length
+ *
+ * @param length
+ * @return
+ */
+ public boolean hasEnoughSpace(int length)
+ {
+ return size + length < capacity;
+ }
+
+ public long size()
+ {
+ return size;
+ }
+
+ public long capacity()
+ {
+ return capacity;
+ }
+
+ public boolean isFresh()
+ {
+ return (size == 0 && objectBeginOffset == 0 && exposedSlices == false);
+ }
+
+ /**
+ * Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call.
+ *
+ * @return
+ */
+ public boolean isClear()
+ {
+ return objectBeginOffset == size;
+ }
+
+ public void release()
+ {
+ reset();
+ buffer = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
new file mode 100644
index 0000000..f8a097e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * The process of interface would be:
+ * - Stream keep on reporting how many free blocks it has in certain frequent. usually at the end of each window
+ * - Stream check how many block should release. Stream usually release the blocks but Stream can make its own decision
+ * - Stream report how many blocks actually released
+ */
+public interface BlockReleaseStrategy
+{
+ /**
+ * The stream should call this method to report to the strategy how many blocks are free currently.
+ * @param freeBlockNum
+ */
+ void currentFreeBlocks(int freeBlockNum);
+
+ /**
+ * Get how many blocks can be released
+ * @return
+ */
+ int getNumBlocksToRelease();
+
+ /**
+ * The stream should call this method to report how many block are released.
+ * @param numReleasedBlocks
+ */
+ void releasedBlocks(int numReleasedBlocks);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
new file mode 100644
index 0000000..ee50f7d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A stream is a collection of blocks
+ * BlockStream avoids copying the data that are already exposed to the caller
+ *
+ */
+public class BlockStream extends OutputStream
+{
+ private static final Logger logger = LoggerFactory.getLogger(BlockStream.class);
+
+ //the initial capacity of each block
+ protected final int blockCapacity;
+
+ protected Map<Integer, Block> blocks = Maps.newHashMap();
+ //the index of current block, valid block index should >= 0
+ protected int currentBlockIndex = 0;
+ protected long size = 0;
+
+ protected Block currentBlock;
+
+ public BlockStream()
+ {
+ this(Block.DEFAULT_BLOCK_SIZE);
+ }
+
+ public BlockStream(int blockCapacity)
+ {
+ this.blockCapacity = blockCapacity;
+ }
+
+ @Override
+ public void write(byte[] data)
+ {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(int b)
+ {
+ currentBlock = getOrCreateCurrentBlock();
+ try {
+ currentBlock.write((byte)b);
+ } catch (Block.OutOfBlockBufferMemoryException e) {
+ reallocateBlock();
+ currentBlock.write((byte)b);
+ }
+ size++;
+ }
+
+ /**
+ * This write could be called multiple times for an object.
+ * The write method makes sure the same object only write to one block
+ *
+ * @param data
+ * @param offset
+ * @param length
+ */
+ @Override
+ public void write(byte[] data, final int offset, final int length)
+ {
+ //start with a block which at least can hold this data
+ currentBlock = getOrCreateCurrentBlock();
+ try {
+ currentBlock.write(data, offset, length);
+ } catch (Block.OutOfBlockBufferMemoryException e) {
+ reallocateBlock();
+ currentBlock.write(data, offset, length);
+ }
+ size += length;
+ }
+
+ private void reallocateBlock()
+ {
+ //use next block
+ Block previousBlock = moveToNextBlock();
+ if (!currentBlock.isFresh()) {
+ throw new RuntimeException("New block is not fresh.");
+ }
+ if (!previousBlock.isClear()) {
+ previousBlock.moveLastObjectDataTo(currentBlock);
+ }
+ }
+
+ /**
+ *
+ * @return The previous block
+ */
+ protected Block moveToNextBlock()
+ {
+ Block previousBlock = currentBlock;
+
+ ++currentBlockIndex;
+ currentBlock = getOrCreateCurrentBlock();
+ if (!currentBlock.isFresh()) {
+ throw new RuntimeException("Assigned non fresh block.");
+ }
+ return previousBlock;
+ }
+
+ protected Block getOrCreateCurrentBlock()
+ {
+ Block block = blocks.get(currentBlockIndex);
+ if (block == null) {
+ block = new Block(blockCapacity);
+ blocks.put(currentBlockIndex, block);
+ }
+ return block;
+ }
+
+ public long size()
+ {
+ return size;
+ }
+
+ public long capacity()
+ {
+ long capacity = 0;
+ for (Block block : blocks.values()) {
+ capacity += block.capacity();
+ }
+ return capacity;
+ }
+
+ /**
+ *
+ * this is the call that represents the end of an object
+ */
+ public Slice toSlice()
+ {
+ return blocks.get(currentBlockIndex).toSlice();
+ }
+
+ /**
+ * resets all blocks
+ */
+ public void reset()
+ {
+ currentBlockIndex = 0;
+ size = 0;
+ for (Block block : blocks.values()) {
+ block.reset();
+ }
+ }
+
+ public void release()
+ {
+ reset();
+ blocks.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
new file mode 100644
index 0000000..5d830fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.getopt.util.hash.MurmurHash;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * com.datatorrent.netlet.util.Slice has problem with the hashCode(), so
+ * override here
+ *
+ */
+public class BufferSlice extends Slice
+{
+ private static final long serialVersionUID = -471209532589983329L;
+ public static final BufferSlice EMPTY_SLICE = new BufferSlice(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+ //for kyro
+ private BufferSlice()
+ {
+ //the super class's default constructor is private and can't called.
+ super(null, 0, 0);
+ }
+
+ public BufferSlice(byte[] array, int offset, int length)
+ {
+ super(array, offset, length);
+ }
+
+ public BufferSlice(byte[] array)
+ {
+ super(array);
+ }
+
+ public BufferSlice(Slice netletSlice)
+ {
+ this(netletSlice.buffer, netletSlice.offset, netletSlice.length);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 5;
+ hash = 59 * hash + MurmurHash.hash(buffer, hash, offset, length);
+ hash = 59 * hash + this.length;
+ return hash;
+ }
+
+ /**
+ * let this class equals with com.datatorrent.netlet.util.Slice
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (!Slice.class.isAssignableFrom(obj.getClass())) {
+ return false;
+ }
+ final Slice other = (Slice)obj;
+ if (this.length != other.length) {
+ return false;
+ }
+
+ final int offset1 = this.offset;
+ final byte[] buffer1 = this.buffer;
+ int i = offset1 + this.length;
+
+ final byte[] buffer2 = other.buffer;
+ int j = other.offset + other.length;
+
+ while (i-- > offset1) {
+ if (buffer1[i] != buffer2[--j]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
new file mode 100644
index 0000000..bcd0b74
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class CollectionSerde<T, CollectionT extends Collection<T>> implements Serde<CollectionT>
+{
+ @NotNull
+ private Serde<T> serde;
+
+ @NotNull
+ private Class<? extends CollectionT> collectionClass;
+
+ private CollectionSerde()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link CollectionSerde}.
+ * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+ */
+ public CollectionSerde(@NotNull Serde<T> serde, @NotNull Class<? extends CollectionT> collectionClass /*Class<? extends C1> collectionClass*/ )
+ {
+ this.serde = Preconditions.checkNotNull(serde);
+ this.collectionClass = Preconditions.checkNotNull(collectionClass);
+ }
+
+ @Override
+ public void serialize(CollectionT objects, Output output)
+ {
+ if (objects.size() == 0) {
+ return;
+ }
+ output.writeInt(objects.size(), true);
+ Serde<T> serializer = getItemSerde();
+ for (T object : objects) {
+ serializer.serialize(object, output);
+ }
+ }
+
+ @Override
+ public CollectionT deserialize(Input input)
+ {
+ int numElements = input.readInt(true);
+
+ try {
+ CollectionT collection = collectionClass.newInstance();
+
+ for (int index = 0; index < numElements; index++) {
+ T object = serde.deserialize(input);
+ collection.add(object);
+ }
+
+ return collection;
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ protected Serde<T> getItemSerde()
+ {
+ return serde;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
new file mode 100644
index 0000000..93929e4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+/**
+ * This implementation get the minimum number of free blocks in the period to release.
+ *
+ */
+public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
+{
+ public static final int DEFAULT_PERIOD = 60; // 60 reports
+ private CircularFifoBuffer freeBlockNumQueue;
+ private Integer[] tmpArray;
+
+ public DefaultBlockReleaseStrategy()
+ {
+ this(DEFAULT_PERIOD);
+ }
+
+ public DefaultBlockReleaseStrategy(int period)
+ {
+ freeBlockNumQueue = new CircularFifoBuffer(period);
+ tmpArray = new Integer[period];
+ Arrays.fill(tmpArray, 0);
+ }
+
+ /**
+ * The stream calls this to report to the strategy how many blocks are free currently.
+ * @param freeBlockNum
+ */
+ @Override
+ public void currentFreeBlocks(int freeBlockNum)
+ {
+ if (freeBlockNum < 0) {
+ throw new IllegalArgumentException("The number of free blocks could not less than zero.");
+ }
+ freeBlockNumQueue.add(freeBlockNum);
+ }
+
+ /**
+ * Get how many blocks that can be released
+ * @return
+ */
+ @Override
+ public int getNumBlocksToRelease()
+ {
+ int minNum = Integer.MAX_VALUE;
+ for (Object num : freeBlockNumQueue) {
+ minNum = Math.min((Integer)num, minNum);
+ }
+ return minNum;
+ }
+
+
+ /**
+ * report how many blocks that have been released.
+ * @param numReleasedBlocks
+ */
+ @Override
+ public void releasedBlocks(int numReleasedBlocks)
+ {
+ if (numReleasedBlocks == 0) {
+ return;
+ }
+ if (numReleasedBlocks < 0) {
+ throw new IllegalArgumentException("Num of released blocks should not be negative");
+ }
+ /**
+ * decrease by released blocks
+ */
+ for (Object num : freeBlockNumQueue) {
+ freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
new file mode 100644
index 0000000..0fbb2ab
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class GenericSerde<T> implements Serde<T>
+{
+ private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+ {
+ @Override
+ public Kryo get()
+ {
+ return new Kryo();
+ }
+ };
+
+ private final Class<? extends T> clazz;
+
+ public GenericSerde()
+ {
+ this.clazz = null;
+ }
+
+ public GenericSerde(Class<? extends T> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public void serialize(T object, Output output)
+ {
+ Kryo kryo = kryos.get();
+ if (clazz == null) {
+ kryo.writeClassAndObject(output, object);
+ } else {
+ kryo.writeObject(output, object);
+ }
+ }
+
+ @Override
+ public T deserialize(Input input)
+ {
+ T object;
+ Kryo kryo = kryos.get();
+ if (clazz == null) {
+ object = (T)kryo.readClassAndObject(input);
+ } else {
+ object = kryo.readObject(input, clazz);
+ }
+ return object;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
new file mode 100644
index 0000000..032b5e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class IntSerde implements Serde<Integer>
+{
+ @Override
+ public void serialize(Integer value, Output output)
+ {
+ output.writeInt(value);
+ }
+
+ @Override
+ public Integer deserialize(Input input)
+ {
+ return input.readInt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
new file mode 100644
index 0000000..a7dfa7f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * This interface provides methods for stream for key/value.
+ * The implementation can separate the stream for key and value or share the same one.
+ *
+ */
+public interface KeyValueByteStreamProvider
+{
+ /**
+ * @return The stream for keeping key
+ */
+ WindowedBlockStream getKeyStream();
+
+ /**
+ * @return The stream for keeping value
+ */
+ WindowedBlockStream getValueStream();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
new file mode 100644
index 0000000..6fbe9fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class KeyValueSerdeManager<K, V>
+{
+ protected Serde<K> keySerde;
+ protected Serde<V> valueSerde;
+
+ protected SerializationBuffer keyBufferForWrite;
+ protected transient SerializationBuffer keyBufferForRead = SerializationBuffer.READ_BUFFER;
+
+ protected SerializationBuffer valueBuffer;
+
+
+ protected KeyValueSerdeManager()
+ {
+ //for kyro
+ }
+
+ public KeyValueSerdeManager(Serde<K> keySerde, Serde<V> valueSerde)
+ {
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setup(BucketProvider bp, long bucketId)
+ {
+ //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
+ Bucket bucketInst = bp.ensureBucket(bucketId);
+ this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+
+ keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+ }
+
+ public Slice serializeKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ keySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+
+ /**
+ * Value only serialize for writing
+ * @param value
+ * @return
+ */
+ public Slice serializeValue(V value)
+ {
+ valueSerde.serialize(value, valueBuffer);
+ return valueBuffer.toSlice();
+ }
+
+ public void beginWindow(long windowId)
+ {
+ keyBufferForWrite.beginWindow(windowId);
+ valueBuffer.beginWindow(windowId);
+ }
+
+ public void resetReadBuffer()
+ {
+ keyBufferForRead.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
new file mode 100644
index 0000000..0b63737
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class LongSerde implements Serde<Long>
+{
+ @Override
+ public void serialize(Long value, Output output)
+ {
+ output.writeLong(value);
+ }
+
+ @Override
+ public Long deserialize(Input input)
+ {
+ return input.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
new file mode 100644
index 0000000..3190880
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class PairSerde<T1, T2> implements Serde<Pair<T1, T2>>
+{
+ @NotNull
+ private Serde<T1> serde1;
+ @NotNull
+ private Serde<T2> serde2;
+
+ private PairSerde()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link PairSerde}.
+ * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+ * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+ */
+ public PairSerde(@NotNull Serde<T1> serde1, @NotNull Serde<T2> serde2)
+ {
+ this.serde1 = Preconditions.checkNotNull(serde1);
+ this.serde2 = Preconditions.checkNotNull(serde2);
+ }
+
+ @Override
+ public void serialize(Pair<T1, T2> pair, Output output)
+ {
+ serde1.serialize(pair.getLeft(), output);
+ serde2.serialize(pair.getRight(), output);
+ }
+
+ @Override
+ public Pair<T1, T2> deserialize(Input input)
+ {
+ T1 first = serde1.deserialize(input);
+ T2 second = serde2.deserialize(input);
+ return new ImmutablePair<>(first, second);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
deleted file mode 100644
index 9669981..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
- * Similarly when deserialization is performed the input byte array is returned.
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class PassThruByteArraySerde implements Serde<byte[], byte[]>
-{
- @Override
- public byte[] serialize(byte[] object)
- {
- return object;
- }
-
- @Override
- public byte[] deserialize(byte[] object, MutableInt offset)
- {
- offset.add(object.length);
- return object;
- }
-
- @Override
- public byte[] deserialize(byte[] object)
- {
- return object;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
deleted file mode 100644
index b22bf6f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
- * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
- * out of the {@link Slice} object.
- *
- * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
- *
- * @since 3.5.0
- */
-public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
-{
- @Override
- public Slice serialize(byte[] object)
- {
- return new Slice(object);
- }
-
- @Override
- public byte[] deserialize(Slice object, MutableInt offset)
- {
- offset.add(object.length);
-
- if (object.offset == 0) {
- return object.buffer;
- }
-
- byte[] bytes = new byte[object.length];
- System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
- return bytes;
- }
-
- @Override
- public byte[] deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
index 2646c0e..679e116 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -18,9 +18,14 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
-import org.apache.commons.lang3.mutable.MutableInt;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceStability;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Throwables;
+
import com.datatorrent.netlet.util.Slice;
/**
@@ -30,23 +35,26 @@ import com.datatorrent.netlet.util.Slice;
* @since 3.5.0
*/
@InterfaceStability.Evolving
-public class PassThruSliceSerde implements Serde<Slice, Slice>
+public class PassThruSliceSerde implements Serde<Slice>
{
@Override
- public Slice serialize(Slice object)
- {
- return object;
- }
-
- @Override
- public Slice deserialize(Slice object, MutableInt offset)
+ public void serialize(Slice slice, Output output)
{
- return object;
+ output.write(slice.buffer, slice.offset, slice.length);
}
@Override
- public Slice deserialize(Slice object)
+ public Slice deserialize(Input input)
{
- return object;
+ if (input.getInputStream() != null) {
+ // The input is backed by a stream, cannot directly use its internal buffer
+ try {
+ return new Slice(input.readBytes(input.available()));
+ } catch (IOException ex) {
+ throw Throwables.propagate(ex);
+ }
+ } else {
+ return new Slice(input.getBuffer(), input.position(), input.limit() - input.position());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
index 6e02aee..d09612d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
@@ -18,46 +18,29 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
-import org.apache.commons.lang3.mutable.MutableInt;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
/**
* This is an interface for a Serializer/Deserializer class.
- * @param <OBJ> The type of the object to Serialize and Deserialize.
- * @param <SER> The type to Serialize an Object to.
+ * @param <T> The type of the object to Serialize and Deserialize.
*
* @since 3.4.0
*/
-public interface Serde<OBJ, SER>
+public interface Serde<T>
{
/**
- * Serialized the given object.
- * @param object The object to serialize.
- * @return The serialized representation of the object.
+ * Serialize the object to the given output.
+ * @param object
+ * @param output
*/
- SER serialize(OBJ object);
+ void serialize(T object, Output output);
/**
- * Deserializes the given serialized representation of an object.
- * @param object The serialized representation of an object.
- * @param offset An offset in the serialized representation of the object. After the
- * deserialize method completes the offset is updated, so that the offset points to
- * the remaining unprocessed portion of the serialized object. For example:<br/>
- * {@code
- * Object obj;
- * MutableInt mi;
- * someObj1 = deserialize(obj, mi);
- * someObj2 = deserialize(obj, mi);
- * }
+ * Deserialize from the input and return a new object.
*
- * @return The deserialized object.
+ * @param input
+ * @return
*/
- OBJ deserialize(SER object, MutableInt offset);
-
- /**
- * Deserializes the given serialized representation of an object.
- * @param object The serialized representation of an object.
- *
- * @return The deserialized object.
- */
- OBJ deserialize(SER object);
+ T deserialize(Input input);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
deleted file mode 100644
index eca1d5f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice>
-{
- @NotNull
- private Serde<T, Slice> serde;
-
- @NotNull
- private Class<? extends CollectionT> collectionClass;
-
- private SerdeCollectionSlice()
- {
- // for Kryo
- }
-
- /**
- * Creates a {@link SerdeCollectionSlice}.
- * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
- */
- public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass)
- {
- this.serde = Preconditions.checkNotNull(serde);
- this.collectionClass = Preconditions.checkNotNull(collectionClass);
- }
-
- @Override
- public Slice serialize(CollectionT objects)
- {
- Slice[] slices = new Slice[objects.size()];
-
- int size = 4;
-
- int index = 0;
- for (T object : objects) {
- Slice slice = serde.serialize(object);
- slices[index++] = slice;
- size += slice.length;
- }
-
- byte[] bytes = new byte[size];
- int offset = 0;
-
- byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
- System.arraycopy(sizeBytes, 0, bytes, offset, 4);
- offset += 4;
-
- for (index = 0; index < slices.length; index++) {
- Slice slice = slices[index];
- System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
- offset += slice.length;
- }
-
- return new Slice(bytes);
- }
-
- @Override
- public CollectionT deserialize(Slice slice, MutableInt offset)
- {
- MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
- int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
- sliceOffset.subtract(slice.offset);
- try {
- CollectionT collection = collectionClass.newInstance();
-
- for (int index = 0; index < numElements; index++) {
- T object = serde.deserialize(slice, sliceOffset);
- collection.add(object);
- }
-
- offset.setValue(sliceOffset.intValue());
- return collection;
- } catch (Exception ex) {
- throw Throwables.propagate(ex);
- }
- }
-
- @Override
- public CollectionT deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
deleted file mode 100644
index 3275a93..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeIntSlice implements Serde<Integer, Slice>
-{
- @Override
- public Slice serialize(Integer object)
- {
- return new Slice(GPOUtils.serializeInt(object));
- }
-
- @Override
- public Integer deserialize(Slice slice, MutableInt offset)
- {
- int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
- offset.add(4);
- return val;
- }
-
- @Override
- public Integer deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
deleted file mode 100644
index d4b9488..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
- * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
- * incompatible changes to the class being serialized.
- *
- * @param <T> The type being serialized
- */
-@InterfaceStability.Evolving
-public class SerdeKryoSlice<T> implements Serde<T, Slice>
-{
- // Setup ThreadLocal of Kryo instances
- private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
- {
- protected Kryo initialValue()
- {
- Kryo kryo = new Kryo();
- // configure kryo instance, customize settings
- return kryo;
- }
- };
-
- private final Class<? extends T> clazz;
-
- public SerdeKryoSlice()
- {
- this.clazz = null;
- }
-
- public SerdeKryoSlice(Class<? extends T> clazz)
- {
- this.clazz = clazz;
- }
-
- @Override
- public Slice serialize(T object)
- {
- Kryo kryo = kryos.get();
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- Output output = new Output(stream);
- if (clazz == null) {
- kryo.writeClassAndObject(output, object);
- } else {
- kryo.writeObject(output, object);
- }
- return new Slice(output.toBytes());
- }
-
- @Override
- public T deserialize(Slice slice, MutableInt offset)
- {
- byte[] bytes = slice.toByteArray();
- Kryo kryo = kryos.get();
- Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
- T object;
- if (clazz == null) {
- object = (T)kryo.readClassAndObject(input);
- } else {
- object = kryo.readObject(input, clazz);
- }
- offset.setValue(bytes.length - input.position());
- return object;
- }
-
- @Override
- public T deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
deleted file mode 100644
index 6fe07d9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeLongSlice implements Serde<Long, Slice>
-{
- @Override
- public Slice serialize(Long object)
- {
- return new Slice(GPOUtils.serializeLong(object));
- }
-
- @Override
- public Long deserialize(Slice slice, MutableInt offset)
- {
- long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
- offset.add(8);
- return val;
- }
-
- @Override
- public Long deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
deleted file mode 100644
index 59cf282..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes pairs.
- */
-@InterfaceStability.Evolving
-public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
-{
- @NotNull
- private Serde<T1, Slice> serde1;
- @NotNull
- private Serde<T2, Slice> serde2;
-
- private SerdePairSlice()
- {
- // for Kryo
- }
-
- /**
- * Creates a {@link SerdePairSlice}.
- * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
- * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
- */
- public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
- {
- this.serde1 = Preconditions.checkNotNull(serde1);
- this.serde2 = Preconditions.checkNotNull(serde2);
- }
-
- @Override
- public Slice serialize(Pair<T1, T2> pair)
- {
- int size = 0;
-
- Slice slice1 = serde1.serialize(pair.getLeft());
- size += slice1.length;
- Slice slice2 = serde2.serialize(pair.getRight());
- size += slice2.length;
-
- byte[] bytes = new byte[size];
- System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
- System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
- return new Slice(bytes);
- }
-
- @Override
- public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
- {
- T1 first = serde1.deserialize(slice, offset);
- T2 second = serde2.deserialize(slice, offset);
- return new ImmutablePair<>(first, second);
- }
-
- @Override
- public Pair<T1, T2> deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
deleted file mode 100644
index aaf0d61..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeStringSlice implements Serde<String, Slice>
-{
- @Override
- public Slice serialize(String object)
- {
- return new Slice(GPOUtils.serializeString(object));
- }
-
- @Override
- public String deserialize(Slice object, MutableInt offset)
- {
- offset.add(object.offset);
- String string = GPOUtils.deserializeString(object.buffer, offset);
- offset.subtract(object.offset);
- return string;
- }
-
- @Override
- public String deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
new file mode 100644
index 0000000..f33f1e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerializationBuffer extends Output implements WindowCompleteListener, WindowListener
+{
+ /*
+ * Singleton read buffer for serialization
+ */
+ public static final SerializationBuffer READ_BUFFER = new SerializationBuffer(new WindowedBlockStream());
+
+ private WindowedBlockStream windowedBlockStream;
+
+ @SuppressWarnings("unused")
+ private SerializationBuffer()
+ {
+ this(new WindowedBlockStream());
+ }
+
+ public SerializationBuffer(WindowedBlockStream windowedBlockStream)
+ {
+ super(windowedBlockStream);
+ this.windowedBlockStream = windowedBlockStream;
+ }
+
+ public long size()
+ {
+ return windowedBlockStream.size();
+ }
+
+ public long capacity()
+ {
+ return windowedBlockStream.capacity();
+ }
+
+ /**
+ * This method should be called only after the whole object has been written
+ * @return The slice which represents the object
+ */
+ public Slice toSlice()
+ {
+ this.flush();
+ return windowedBlockStream.toSlice();
+ }
+
+ /**
+ * reset the environment to reuse the resource.
+ */
+ public void reset()
+ {
+ windowedBlockStream.reset();
+ }
+
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ windowedBlockStream.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ windowedBlockStream.endWindow();
+ }
+
+ public void release()
+ {
+ reset();
+ windowedBlockStream.reset();
+ }
+
+ public WindowedBlockStream createWindowedBlockStream()
+ {
+ return new WindowedBlockStream();
+ }
+
+ public WindowedBlockStream createWindowedBlockStream(int capacity)
+ {
+ return new WindowedBlockStream(capacity);
+ }
+
+ public WindowedBlockStream getWindowedBlockStream()
+ {
+ return windowedBlockStream;
+ }
+
+ public void setWindowableByteStream(WindowedBlockStream windowableByteStream)
+ {
+ this.windowedBlockStream = windowableByteStream;
+ }
+
+ /**
+ * reset for all windows with window id less than or equal to the input windowId
+ * this interface doesn't call reset window for each windows. Several windows can be reset at the same time.
+ * @param windowId
+ */
+ @Override
+ public void completeWindow(long windowId)
+ {
+ windowedBlockStream.completeWindow(windowId);
+ }
+
+ public byte[] toByteArray()
+ {
+ return toSlice().toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
index 2671d5e..b504581 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -100,4 +100,14 @@ public class SliceUtils
return new Slice(bytes);
}
+
+ public static BufferSlice toBufferSlice(Slice slice)
+ {
+ if (slice instanceof BufferSlice) {
+ return (BufferSlice)slice;
+ }
+
+ //The hashCode of Slice was not correct, so correct it
+ return new BufferSlice(slice);
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
new file mode 100644
index 0000000..cb45e2a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class StringSerde implements Serde<String>
+{
+ @Override
+ public void serialize(String string, Output output)
+ {
+ output.writeString(string);
+ }
+
+ @Override
+ public String deserialize(Input input)
+ {
+ return input.readString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/6ddefd02/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
new file mode 100644
index 0000000..d2d38a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+public interface WindowCompleteListener
+{
+ /**
+ * Notification that all windows which window id less or equal input windowId are complete
+ *
+ * @param windowId
+ */
+ void completeWindow(long windowId);
+}