You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/10/24 20:38:56 UTC
[1/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use
reusable buffer for serialization in spillable data structures
Repository: apex-malhar
Updated Branches:
refs/heads/master 37991576d -> 2fa1e6b16
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
new file mode 100644
index 0000000..fa4cd73
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowedBlockStream.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.SetMultimap;
+import com.google.common.collect.Sets;
+
+/**
+ * This is a stream which manages blocks and supports window related operations.
+ *
+ */
+public class WindowedBlockStream extends BlockStream implements WindowListener, WindowCompleteListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(WindowedBlockStream.class);
+ /**
+ * Map from windowId to blockIds
+ */
+ protected SetMultimap<Long, Integer> windowToBlockIds = HashMultimap.create();
+
+ /**
+ * set of all free blockIds.
+ */
+ protected Set<Integer> freeBlockIds = Sets.newHashSet();
+
+ // max block index; must be >= 0
+ protected int maxBlockIndex = 0;
+
+ protected long currentWindowId;
+
+ /**
+ * This lock is used for adding/removing block(s)
+ */
+ protected transient ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ protected BlockReleaseStrategy releaseStrategy = new DefaultBlockReleaseStrategy();
+
+ public WindowedBlockStream()
+ {
+ super();
+ }
+
+ public WindowedBlockStream(int blockCapacity)
+ {
+ super(blockCapacity);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ currentWindowId = windowId;
+ moveToNextWindow();
+ }
+
+ /**
+ * make sure different windows will not share any blocks. Move to next block if
+ * current block is already used.
+ */
+ protected void moveToNextWindow()
+ {
+ //use current block if it hasn't be used, else, move to next block
+ Block block = getOrCreateCurrentBlock();
+ if (!block.isClear()) {
+ throw new RuntimeException("Current block not clear, should NOT move to next window. Please call toSlice() to output data first");
+ }
+ if (block.size() > 0) {
+ moveToNextBlock();
+ }
+ windowToBlockIds.put(currentWindowId, currentBlockIndex);
+ }
+
+ /**
+ * This method tries to use a free block first. Allocate a new block if there
+ * are no free blocks
+ *
+ * @return The previous block
+ */
+ @Override
+ protected Block moveToNextBlock()
+ {
+ lock.writeLock().lock();
+ try {
+ Block previousBlock = currentBlock;
+ if (!freeBlockIds.isEmpty()) {
+ currentBlockIndex = freeBlockIds.iterator().next();
+ freeBlockIds.remove(currentBlockIndex);
+ currentBlock = this.blocks.get(currentBlockIndex);
+ } else {
+ currentBlockIndex = ++maxBlockIndex;
+ currentBlock = getOrCreateCurrentBlock();
+ }
+ windowToBlockIds.put(currentWindowId, currentBlockIndex);
+ return previousBlock;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ releaseMemory();
+ }
+
+ @Override
+ public void completeWindow(long windowId)
+ {
+ lock.writeLock().lock();
+ try {
+ Set<Long> windIds = Sets.newHashSet(windowToBlockIds.keySet());
+ for (long windId : windIds) {
+ if (windId <= windowId) {
+ resetWindow(windId);
+ }
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ protected void resetWindow(long windowId)
+ {
+ lock.writeLock().lock();
+ try {
+ Set<Integer> removedBlockIds = windowToBlockIds.removeAll(windowId);
+
+ int removedSize = 0;
+ for (int blockId : removedBlockIds) {
+ removedSize += blocks.get(blockId).size();
+ Block theBlock = blocks.get(blockId);
+ theBlock.reset();
+ if (theBlock == currentBlock) {
+ //the client code could ask reset up to current window
+ //but the reset block should not be current block. current block should be reassigned.
+ moveToNextBlock();
+ }
+ logger.debug("reset block: {}, currentBlock: {}", blockId, theBlock);
+ }
+
+ freeBlockIds.addAll(removedBlockIds);
+ size -= removedSize;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void reset()
+ {
+ lock.writeLock().lock();
+ try {
+ super.reset();
+
+ //all blocks are free now except the current one
+ freeBlockIds.addAll(blocks.keySet());
+ freeBlockIds.remove(currentBlockIndex);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * The size of the data of all windows with id less than or equals to windowId
+ * @param windowId
+ * @return
+ */
+ public long dataSizeUpToWindow(long windowId)
+ {
+ lock.readLock().lock();
+ try {
+ long totalSize = 0;
+ for (long winId : windowToBlockIds.keySet()) {
+ totalSize += dataSizeOfWindow(winId);
+ }
+ return totalSize;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ protected long dataSizeOfWindow(long windowId)
+ {
+ lock.readLock().lock();
+ try {
+ long sizeOfWindow = 0;
+ Set<Integer> blockIds = windowToBlockIds.get(windowId);
+ if (blockIds != null) {
+ for (int blockId : blockIds) {
+ sizeOfWindow += blocks.get(blockId).size();
+ }
+ }
+ return sizeOfWindow;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void releaseMemory()
+ {
+ /**
+ * report and release extra blocks
+ */
+ releaseStrategy.currentFreeBlocks(freeBlockIds.size());
+ int releasingBlocks = Math.min(releaseStrategy.getNumBlocksToRelease(), freeBlockIds.size());
+ int releasedBlocks = 0;
+ Iterator<Integer> iter = freeBlockIds.iterator();
+ while (releasedBlocks < releasingBlocks) {
+ //release blocks
+ int blockId = iter.next();
+ iter.remove();
+ blocks.remove(blockId);
+ releasedBlocks++;
+ }
+
+ /**
+ * report number of released blocks
+ */
+ if (releasedBlocks > 0) {
+ releaseStrategy.releasedBlocks(releasedBlocks);
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
index da44fb1..b88501e 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableSessionWindowedStorage.java
@@ -34,7 +34,6 @@ import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* Spillable session windowed storage.
@@ -53,7 +52,7 @@ public class SpillableSessionWindowedStorage<K, V> extends SpillableWindowedKeye
if (keyToWindowsMap == null) {
// NOTE: this will pose difficulties when we try to assign the entries to a time bucket later on.
// This is logged in APEXMALHAR-2271
- keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>, Slice>)(Serde)windowSerde);
+ keyToWindowsMap = scc.newSpillableSetMultimap(bucket, keySerde, (Serde<Window.SessionWindow<K>>)(Serde)windowSerde);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
index ac386ab..ef111b3 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedKeyedStorage.java
@@ -27,15 +27,14 @@ import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* Implementation of WindowedKeyedStorage using {@link Spillable} data structures
@@ -48,10 +47,10 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
@NotNull
protected SpillableComplexComponent scc;
protected long bucket;
- protected Serde<Window, Slice> windowSerde;
- protected Serde<Pair<Window, K>, Slice> windowKeyPairSerde;
- protected Serde<K, Slice> keySerde;
- protected Serde<V, Slice> valueSerde;
+ protected Serde<Window> windowSerde;
+ protected Serde<Pair<Window, K>> windowKeyPairSerde;
+ protected Serde<K> keySerde;
+ protected Serde<V> valueSerde;
protected Spillable.SpillableMap<Pair<Window, K>, V> windowKeyToValueMap;
protected Spillable.SpillableSetMultimap<Window, K> windowToKeysMap;
@@ -96,7 +95,7 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
}
public SpillableWindowedKeyedStorage(long bucket,
- Serde<Window, Slice> windowSerde, Serde<Pair<Window, K>, Slice> windowKeyPairSerde, Serde<K, Slice> keySerde, Serde<V, Slice> valueSerde)
+ Serde<Window> windowSerde, Serde<Pair<Window, K>> windowKeyPairSerde, Serde<K> keySerde, Serde<V> valueSerde)
{
this.bucket = bucket;
this.windowSerde = windowSerde;
@@ -120,17 +119,17 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
this.bucket = bucket;
}
- public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ public void setWindowSerde(Serde<Window> windowSerde)
{
this.windowSerde = windowSerde;
}
- public void setWindowKeyPairSerde(Serde<Pair<Window, K>, Slice> windowKeyPairSerde)
+ public void setWindowKeyPairSerde(Serde<Pair<Window, K>> windowKeyPairSerde)
{
this.windowKeyPairSerde = windowKeyPairSerde;
}
- public void setValueSerde(Serde<V, Slice> valueSerde)
+ public void setValueSerde(Serde<V> valueSerde)
{
this.valueSerde = valueSerde;
}
@@ -168,16 +167,16 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
}
// set default serdes
if (windowSerde == null) {
- windowSerde = new SerdeKryoSlice<>();
+ windowSerde = new GenericSerde<>();
}
if (windowKeyPairSerde == null) {
- windowKeyPairSerde = new SerdeKryoSlice<>();
+ windowKeyPairSerde = new GenericSerde<>();
}
if (keySerde == null) {
- keySerde = new SerdeKryoSlice<>();
+ keySerde = new GenericSerde<>();
}
if (valueSerde == null) {
- valueSerde = new SerdeKryoSlice<>();
+ valueSerde = new GenericSerde<>();
}
if (windowKeyToValueMap == null) {
@@ -220,5 +219,4 @@ public class SpillableWindowedKeyedStorage<K, V> implements WindowedStorage.Wind
{
return windowKeyToValueMap.get(new ImmutablePair<>(window, key));
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
index 6666381..9a8a291 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/SpillableWindowedPlainStorage.java
@@ -24,13 +24,12 @@ import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.spillable.Spillable;
import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
+import org.apache.apex.malhar.lib.utils.serde.GenericSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeKryoSlice;
import org.apache.apex.malhar.lib.window.Window;
import org.apache.apex.malhar.lib.window.WindowedStorage;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* This is an implementation of WindowedPlainStorage that makes use of {@link Spillable} data structures
@@ -42,8 +41,8 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
@NotNull
private SpillableComplexComponent scc;
private long bucket;
- private Serde<Window, Slice> windowSerde;
- private Serde<T, Slice> valueSerde;
+ private Serde<Window> windowSerde;
+ private Serde<T> valueSerde;
protected Spillable.SpillableMap<Window, T> windowToDataMap;
@@ -51,7 +50,7 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
{
}
- public SpillableWindowedPlainStorage(long bucket, Serde<Window, Slice> windowSerde, Serde<T, Slice> valueSerde)
+ public SpillableWindowedPlainStorage(long bucket, Serde<Window> windowSerde, Serde<T> valueSerde)
{
this.bucket = bucket;
this.windowSerde = windowSerde;
@@ -73,12 +72,12 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
this.bucket = bucket;
}
- public void setWindowSerde(Serde<Window, Slice> windowSerde)
+ public void setWindowSerde(Serde<Window> windowSerde)
{
this.windowSerde = windowSerde;
}
- public void setValueSerde(Serde<T, Slice> valueSerde)
+ public void setValueSerde(Serde<T> valueSerde)
{
this.valueSerde = valueSerde;
}
@@ -128,10 +127,10 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
}
// set default serdes
if (windowSerde == null) {
- windowSerde = new SerdeKryoSlice<>();
+ windowSerde = new GenericSerde<>();
}
if (valueSerde == null) {
- valueSerde = new SerdeKryoSlice<>();
+ valueSerde = new GenericSerde<>();
}
if (windowToDataMap == null) {
windowToDataMap = scc.newSpillableMap(bucket, windowSerde, valueSerde);
@@ -142,5 +141,4 @@ public class SpillableWindowedPlainStorage<T> implements WindowedStorage.Windowe
public void teardown()
{
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 403072d..92937a9 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -24,6 +24,7 @@ import java.util.List;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.commons.io.FileUtils;
import com.google.common.base.Preconditions;
@@ -57,7 +58,7 @@ public class TestUtils
public static Slice getSlice(int val)
{
- return new Slice(getBytes(val));
+ return new BufferSlice(getBytes(val));
}
public static class TestInfo extends TestWatcher
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
index 2058b69..6645a98 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -28,6 +28,12 @@ import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.apex.malhar.lib.state.managed.Bucket.ReadSource;
+import org.apache.apex.malhar.lib.utils.serde.AffixSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
import com.google.common.primitives.Longs;
import com.datatorrent.lib.fileaccess.FileAccess;
@@ -82,6 +88,7 @@ public class DefaultBucketTest
Assert.assertNull("value not present", value);
Assert.assertEquals("size of bucket", one.length * 2 + Longs.BYTES, testMeta.defaultBucket.getSizeInBytes());
+
testMeta.defaultBucket.teardown();
}
@@ -126,7 +133,6 @@ public class DefaultBucketTest
Slice one = ManagedStateTestUtils.getSliceFor("1");
testPut();
Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
- Assert.assertEquals("size", 1, unsaved.size());
Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next();
Assert.assertEquals("key", one, entry.getKey());
@@ -192,15 +198,39 @@ public class DefaultBucketTest
testGetFromReader();
long initSize = testMeta.defaultBucket.getSizeInBytes();
- Slice two = ManagedStateTestUtils.getSliceFor("2");
- testMeta.defaultBucket.put(two, 101, two);
-
- Assert.assertEquals("size", initSize + (two.length * 2 + Longs.BYTES ), testMeta.defaultBucket.getSizeInBytes());
+ //The temporary memory generated by get was not managed by bucket, only put was managed by bucket
+ SerializationBuffer buffer = new SerializationBuffer(testMeta.defaultBucket.getKeyStream());
+ byte[] keyPrefix = new byte[]{0};
+ String key = "1";
+ String value = "2";
+ AffixSerde<String> keySerde = new AffixSerde<>(keyPrefix, new StringSerde(), null);
+
+ StringSerde valueSerde = new StringSerde();
+
+ testMeta.defaultBucket.getKeyStream().beginWindow(1);
+ testMeta.defaultBucket.getValueStream().beginWindow(1);
+ keySerde.serialize(key, buffer);
+ Slice keySlice = buffer.toSlice();
+ valueSerde.serialize(value, buffer);
+ Slice valueSlice = buffer.toSlice();
+ testMeta.defaultBucket.put(keySlice, 1, valueSlice);
+ testMeta.defaultBucket.getKeyStream().endWindow();
+ testMeta.defaultBucket.getValueStream().endWindow();
+
+ long currentSize = testMeta.defaultBucket.getSizeInBytes();
+ testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
+ //call this method to invoke the release memory
+ testMeta.defaultBucket.get(keySlice, -1, ReadSource.MEMORY);
+ long sizeFreed = currentSize - testMeta.defaultBucket.getSizeInBytes();
+
+ SerializationBuffer tmpBuffer = new SerializationBuffer(new WindowedBlockStream());
+ tmpBuffer.writeBytes(keyPrefix);
+ tmpBuffer.writeString(key);
+ tmpBuffer.writeString(value);
+ int expectedFreedSize = tmpBuffer.toSlice().toByteArray().length; //key prefix, key length, key; value length, value
+ Assert.assertEquals("size freed", expectedFreedSize, sizeFreed);
+ Assert.assertEquals("existing size", currentSize - expectedFreedSize, testMeta.defaultBucket.getSizeInBytes());
- long sizeFreed = testMeta.defaultBucket.freeMemory(Long.MAX_VALUE);
- Assert.assertEquals("size freed", initSize, sizeFreed);
- Assert.assertEquals("existing size", (two.length * 2 + Longs.BYTES), testMeta.defaultBucket.getSizeInBytes());
testMeta.defaultBucket.teardown();
}
-
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
index 0d3f87a..86f8430 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -28,6 +28,7 @@ import org.junit.Assert;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.RemoteIterator;
@@ -128,6 +129,6 @@ public class ManagedStateTestUtils
public static Slice getSliceFor(String x)
{
- return new Slice(x.getBytes());
+ return new BufferSlice(x.getBytes());
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
index af05c88..5dd6404 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.google.common.collect.Lists;
@@ -58,7 +58,7 @@ public class SpillableArrayListImplTest
public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
- new SerdeStringSlice(), 1);
+ new StringSerde(), 1);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
@@ -177,7 +177,7 @@ public class SpillableArrayListImplTest
private void simpleAddGetAndSetTest3Helper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
- new SerdeStringSlice(), 3);
+ new StringSerde(), 3);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
@@ -321,10 +321,10 @@ public class SpillableArrayListImplTest
public void simpleMultiListTestHelper(SpillableStateStore store)
{
SpillableArrayListImpl<String> list1 = new SpillableArrayListImpl<>(0L, ID1, store,
- new SerdeStringSlice(), 1);
+ new StringSerde(), 1);
SpillableArrayListImpl<String> list2 = new SpillableArrayListImpl<>(0L, ID2, store,
- new SerdeStringSlice(), 1);
+ new StringSerde(), 1);
store.setup(testMeta.operatorContext);
list1.setup(testMeta.operatorContext);
@@ -483,7 +483,7 @@ public class SpillableArrayListImplTest
SpillableStateStore store = testMeta.store;
SpillableArrayListImpl<String> list = new SpillableArrayListImpl<>(0L, ID1, store,
- new SerdeStringSlice(), 3);
+ new StringSerde(), 3);
store.setup(testMeta.operatorContext);
list.setup(testMeta.operatorContext);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
index 82fb340..d21bf50 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImplTest.java
@@ -26,9 +26,11 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
import com.google.common.collect.Lists;
@@ -63,8 +65,8 @@ public class SpillableArrayListMultimapImplTest
public void simpleMultiKeyTestHelper(SpillableStateStore store)
{
SpillableArrayListMultimapImpl<String, String> map =
- new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new SerdeStringSlice(),
- new SerdeStringSlice());
+ new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+ new StringSerde());
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -112,11 +114,11 @@ public class SpillableArrayListMultimapImplTest
public long simpleMultiKeyTestHelper(SpillableStateStore store,
SpillableArrayListMultimapImpl<String, String> map, String key, long nextWindowId)
{
- SerdeStringSlice serdeString = new SerdeStringSlice();
- SerdeIntSlice serdeInt = new SerdeIntSlice();
-
- Slice keySlice = serdeString.serialize(key);
-
+ StringSerde serdeString = new StringSerde();
+ IntSerde serdeInt = new IntSerde();
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdeString.serialize(key, buffer);
+ Slice keySlice = buffer.toSlice();
byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
nextWindowId++;
@@ -249,7 +251,7 @@ public class SpillableArrayListMultimapImplTest
SpillableStateStore store = testMeta.store;
SpillableArrayListMultimapImpl<String, String> map =
- new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+ new SpillableArrayListMultimapImpl<>(store, ID1, 0L, new StringSerde(), new StringSerde());
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -323,8 +325,10 @@ public class SpillableArrayListMultimapImplTest
store.beginWindow(nextWindowId);
map.beginWindow(nextWindowId);
- SerdeStringSlice serdeString = new SerdeStringSlice();
- Slice keySlice = serdeString.serialize("a");
+ StringSerde serdeString = new StringSerde();
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdeString.serialize("a", buffer);
+ Slice keySlice = buffer.toSlice();
byte[] keyBytes = SliceUtils.concatenate(ID1, keySlice.toByteArray());
SpillableTestUtils.checkValue(store, 0L, keyBytes, 0, Lists.<String>newArrayList("a", "111", "b", "222", "d",
@@ -350,7 +354,7 @@ public class SpillableArrayListMultimapImplTest
SpillableStateStore store = testMeta.store;
SpillableArrayListMultimapImpl<String, String> multimap = new SpillableArrayListMultimapImpl<>(
- this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+ this.testMeta.store, ID1, 0L, new StringSerde(), new StringSerde());
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
index 5c477b1..29c2090 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImplTest.java
@@ -22,7 +22,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
public class SpillableComplexComponentImplTest
{
@@ -48,9 +48,9 @@ public class SpillableComplexComponentImplTest
SpillableComplexComponentImpl sccImpl = new SpillableComplexComponentImpl(store);
Spillable.SpillableComponent scList =
- (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new SerdeStringSlice());
+ (Spillable.SpillableComponent)sccImpl.newSpillableArrayList(0L, new StringSerde());
Spillable.SpillableComponent scMap =
- (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new SerdeStringSlice(), new SerdeStringSlice());
+ (Spillable.SpillableComponent)sccImpl.newSpillableMap(0L, new StringSerde(), new StringSerde());
sccImpl.setup(testMeta.operatorContext);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
index e8aea46..a96a8fd 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImplTest.java
@@ -23,7 +23,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
@@ -47,6 +47,7 @@ public class SpillableMapImplTest
simpleGetAndPutTestHelper(store);
}
+
@Test
public void simpleGetAndPutManagedStateTest()
{
@@ -55,11 +56,7 @@ public class SpillableMapImplTest
private void simpleGetAndPutTestHelper(SpillableStateStore store)
{
- SerdeStringSlice sss = new SerdeStringSlice();
-
- SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L,
- new SerdeStringSlice(),
- new SerdeStringSlice());
+ SpillableMapImpl<String, String> map = createSpillableMap(store);
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -157,23 +154,25 @@ public class SpillableMapImplTest
public void simpleRemoveTest()
{
InMemSpillableStateStore store = new InMemSpillableStateStore();
-
simpleRemoveTestHelper(store);
}
+
@Test
public void simpleRemoveManagedStateTest()
{
simpleRemoveTestHelper(testMeta.store);
}
- private void simpleRemoveTestHelper(SpillableStateStore store)
+ protected SpillableMapImpl<String, String> createSpillableMap(SpillableStateStore store)
{
- SerdeStringSlice sss = new SerdeStringSlice();
+ return new SpillableMapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+ new StringSerde());
+ }
- SpillableMapImpl<String, String> map = new SpillableMapImpl<>(store, ID1, 0L,
- new SerdeStringSlice(),
- new SerdeStringSlice());
+ private void simpleRemoveTestHelper(SpillableStateStore store)
+ {
+ SpillableMapImpl<String, String> map = createSpillableMap(store);
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -324,14 +323,14 @@ public class SpillableMapImplTest
public void multiMapPerBucketTestHelper(SpillableStateStore store)
{
- SerdeStringSlice sss = new SerdeStringSlice();
+ StringSerde sss = new StringSerde();
SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(store, ID1, 0L,
- new SerdeStringSlice(),
- new SerdeStringSlice());
+ new StringSerde(),
+ new StringSerde());
SpillableMapImpl<String, String> map2 = new SpillableMapImpl<>(store, ID2, 0L,
- new SerdeStringSlice(),
- new SerdeStringSlice());
+ new StringSerde(),
+ new StringSerde());
store.setup(testMeta.operatorContext);
map1.setup(testMeta.operatorContext);
@@ -413,11 +412,11 @@ public class SpillableMapImplTest
@Test
public void recoveryWithManagedStateTest() throws Exception
{
- SerdeStringSlice sss = new SerdeStringSlice();
+ StringSerde sss = new StringSerde();
SpillableMapImpl<String, String> map1 = new SpillableMapImpl<>(testMeta.store, ID1, 0L,
- new SerdeStringSlice(),
- new SerdeStringSlice());
+ new StringSerde(),
+ new StringSerde());
testMeta.store.setup(testMeta.operatorContext);
map1.setup(testMeta.operatorContext);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
index 3883191..d0343e1 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImplTest.java
@@ -26,7 +26,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.google.common.collect.Lists;
@@ -53,7 +53,7 @@ public class SpillableSetImplTest
public void simpleAddGetAndSetTest1Helper(SpillableStateStore store)
{
- SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new SerdeStringSlice());
+ SpillableSetImpl<String> set = new SpillableSetImpl<>(0L, ID1, store, new StringSerde());
store.setup(testMeta.operatorContext);
set.setup(testMeta.operatorContext);
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
index 15970af..2f80628 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImplTest.java
@@ -27,7 +27,8 @@ import org.junit.Rule;
import org.junit.Test;
import org.apache.apex.malhar.lib.state.spillable.inmem.InMemSpillableStateStore;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -62,8 +63,7 @@ public class SpillableSetMultimapImplTest
public void simpleMultiKeyTestHelper(SpillableStateStore store)
{
SpillableSetMultimapImpl<String, String> map =
- new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(),
- new SerdeStringSlice());
+ new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -201,7 +201,7 @@ public class SpillableSetMultimapImplTest
SpillableStateStore store = testMeta.store;
SpillableSetMultimapImpl<String, String> map =
- new SpillableSetMultimapImpl<>(store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+ new SpillableSetMultimapImpl<>(store, ID1, 0L, createStringSerde(), createStringSerde());
store.setup(testMeta.operatorContext);
map.setup(testMeta.operatorContext);
@@ -276,8 +276,9 @@ public class SpillableSetMultimapImplTest
final int numOfEntry = 100000;
SpillableStateStore store = testMeta.store;
- SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>(
- this.testMeta.store, ID1, 0L, new SerdeStringSlice(), new SerdeStringSlice());
+
+ SpillableSetMultimapImpl<String, String> multimap = new SpillableSetMultimapImpl<>(testMeta.store, ID1, 0L,
+ createStringSerde(), createStringSerde());
Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
@@ -294,4 +295,9 @@ public class SpillableSetMultimapImplTest
multimap.endWindow();
store.endWindow();
}
+
+ protected Serde<String> createStringSerde()
+ {
+ return new StringSerde();
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
index 36e3557..d72b1f9 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/SpillableTestUtils.java
@@ -27,14 +27,15 @@ import org.junit.runner.Description;
import org.apache.apex.malhar.lib.state.managed.ManagedStateTestUtils;
import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeStringSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.apex.malhar.lib.utils.serde.SerializationBuffer;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
+import com.esotericsoftware.kryo.io.Input;
import com.datatorrent.api.Context;
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
import com.datatorrent.lib.util.TestUtils;
import com.datatorrent.netlet.util.Slice;
@@ -44,9 +45,9 @@ import com.datatorrent.netlet.util.Slice;
*/
public class SpillableTestUtils
{
- public static SerdeStringSlice SERDE_STRING_SLICE = new SerdeStringSlice();
- public static SerdeCollectionSlice<String, List<String>> SERDE_STRING_LIST_SLICE = new SerdeCollectionSlice<>(new SerdeStringSlice(),
- (Class<List<String>>)(Class)ArrayList.class);
+ public static StringSerde STRING_SERDE = new StringSerde();
+ public static CollectionSerde<String, List<String>> STRING_LIST_SERDE = new CollectionSerde<>(new StringSerde(),
+ (Class)ArrayList.class);
private SpillableTestUtils()
{
@@ -77,34 +78,41 @@ public class SpillableTestUtils
}
}
+ protected static SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+
public static Slice getKeySlice(byte[] id, String key)
{
- return SliceUtils.concatenate(id, SERDE_STRING_SLICE.serialize(key));
+ buffer.writeBytes(id);
+ STRING_SERDE.serialize(key, buffer);
+ return buffer.toSlice();
}
public static Slice getKeySlice(byte[] id, int index, String key)
{
- return SliceUtils.concatenate(id,
- SliceUtils.concatenate(GPOUtils.serializeInt(index),
- SERDE_STRING_SLICE.serialize(key)));
+ buffer.writeBytes(id);
+ buffer.writeInt(index);
+ STRING_SERDE.serialize(key, buffer);
+ return buffer.toSlice();
}
public static void checkValue(SpillableStateStore store, long bucketId, String key,
byte[] prefix, String expectedValue)
{
- checkValue(store, bucketId, SliceUtils.concatenate(prefix, SERDE_STRING_SLICE.serialize(key)).buffer,
- expectedValue, 0, SERDE_STRING_SLICE);
+ buffer.writeBytes(prefix);
+ STRING_SERDE.serialize(key, buffer);
+ checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_SERDE);
}
public static void checkValue(SpillableStateStore store, long bucketId,
byte[] prefix, int index, List<String> expectedValue)
{
- checkValue(store, bucketId, SliceUtils.concatenate(prefix, GPOUtils.serializeInt(index)), expectedValue, 0,
- SERDE_STRING_LIST_SLICE);
+ buffer.writeBytes(prefix);
+ buffer.writeInt(index);
+ checkValue(store, bucketId, buffer.toSlice().toByteArray(), expectedValue, 0, STRING_LIST_SERDE);
}
- public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
- T expectedValue, int offset, Serde<T, Slice> serde)
+ public static <T> void checkValue(SpillableStateStore store, long bucketId, byte[] bytes,
+ T expectedValue, int offset, Serde<T> serde)
{
Slice slice = store.getSync(bucketId, new Slice(bytes));
@@ -116,7 +124,7 @@ public class SpillableTestUtils
}
}
- T string = serde.deserialize(slice, new MutableInt(offset));
+ T string = serde.deserialize(new Input(slice.buffer, slice.offset + offset, slice.length));
Assert.assertEquals(expectedValue, string);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
index 8033a7d..a2cbb54 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueueTest.java
@@ -25,9 +25,6 @@ import org.junit.Test;
import com.google.common.collect.Sets;
-/**
- * Created by tfarkas on 6/4/16.
- */
public class TimeBasedPriorityQueueTest
{
@Test
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
new file mode 100644
index 0000000..007fab9
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/AffixSerdeTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class AffixSerdeTest
+{
+ @Test
+ public void simpleTest()
+ {
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ AffixSerde<String> serde = new AffixSerde<>(new byte[]{1, 2, 3}, new StringSerde(), new byte[]{9});
+
+ final String orgValue = "abc";
+ serde.serialize(orgValue, buffer);
+ Slice slice = buffer.toSlice();
+
+ String value = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertEquals(orgValue, value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
new file mode 100644
index 0000000..3b39d6c
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/BlockStreamTest.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class BlockStreamTest
+{
+ protected Random random = new Random();
+
+ @Test
+ public void testWindowedBlockStream()
+ {
+ WindowedBlockStream bs = new WindowedBlockStream();
+ List<byte[]> totalList = Lists.newArrayList();
+ List<Slice> slices = Lists.newArrayList();
+
+ for (int windowId = 0; windowId < 10; ++windowId) {
+ List<byte[]> list = generateList();
+ totalList.addAll(list);
+
+ bs.beginWindow(windowId);
+ writeToBlockStream(bs, list, slices);
+ bs.endWindow();
+
+ if (windowId % 2 != 0) {
+ verify(totalList, slices);
+
+ bs.completeWindow(windowId);
+ totalList.clear();
+ slices.clear();
+ }
+ }
+ }
+
+ @Test
+ public void testBlockStream()
+ {
+ BlockStream bs = new BlockStream();
+ List<byte[]> totalList = Lists.newArrayList();
+ List<Slice> slices = Lists.newArrayList();
+
+ for (int tryTime = 0; tryTime < 10; ++tryTime) {
+ List<byte[]> list = generateList();
+ totalList.addAll(list);
+
+ writeToBlockStream(bs, list, slices);
+
+ if (tryTime % 2 != 0) {
+ verify(totalList, slices);
+
+ bs.reset();
+ totalList.clear();
+ slices.clear();
+ }
+
+ }
+ }
+
+ private void writeToBlockStream(BlockStream bs, List<byte[]> list, List<Slice> slices)
+ {
+ for (byte[] bytes : list) {
+ int times = random.nextInt(100) + 1;
+ int remainLen = bytes.length;
+ int offset = 0;
+ while (times > 0 && remainLen > 0) {
+ int avgSubLen = remainLen / times;
+ times--;
+ if (avgSubLen == 0) {
+ bs.write(bytes, offset, remainLen);
+ break;
+ }
+
+ int writeLen = remainLen;
+ if (times != 0) {
+ int subLen = random.nextInt(avgSubLen * 2);
+ writeLen = Math.min(subLen, remainLen);
+ }
+ bs.write(bytes, offset, writeLen);
+
+ offset += writeLen;
+ remainLen -= writeLen;
+ }
+ slices.add(bs.toSlice());
+ }
+ }
+
+ private void verify(List<byte[]> list, List<Slice> slices)
+ {
+ //verify
+ Assert.assertTrue("size not equal.", list.size() == slices.size());
+
+ for (int i = 0; i < list.size(); ++i) {
+ byte[] bytes = list.get(i);
+ byte[] newBytes = slices.get(i).toByteArray();
+ if (!Arrays.equals(bytes, newBytes)) {
+ Assert.assertArrayEquals(bytes, newBytes);
+ }
+ }
+ }
+
+ private List<byte[]> generateList()
+ {
+ List<byte[]> list = Lists.newArrayList();
+ int size = random.nextInt(10000) + 1;
+ for (int i = 0; i < size; i++) {
+ list.add(generateByteArray());
+ }
+ return list;
+ }
+
+ protected byte[] generateByteArray()
+ {
+ int len = random.nextInt(10000) + 1;
+ byte[] bytes = new byte[len];
+ random.nextBytes(bytes);
+ return bytes;
+ }
+
+
+ @Test
+ public void testReleaseMemory()
+ {
+ WindowedBlockStream stream = new WindowedBlockStream();
+
+ byte[] data = new byte[2048];
+ final int loopPerWindow = 100;
+ long windowId = 0;
+
+ //fill data;
+ for (; windowId < 100; ++windowId) {
+ stream.beginWindow(windowId);
+ for (int i = 0; i < loopPerWindow; ++i) {
+ stream.write(data);
+ stream.toSlice();
+ }
+ stream.endWindow();
+ }
+
+ long capacity = stream.capacity();
+ stream.completeWindow(windowId);
+ Assert.assertTrue(capacity == stream.capacity());
+ Assert.assertTrue(0 == stream.size());
+
+ //release memory;
+ for (; windowId < 200; ++windowId) {
+ stream.beginWindow(windowId);
+ stream.endWindow();
+ }
+
+ //at least keep one block as current block
+ Assert.assertTrue(stream.capacity() == Block.DEFAULT_BLOCK_SIZE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
new file mode 100644
index 0000000..255d9c0
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerdeTest.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class CollectionSerdeTest
+{
+ @Test
+ public void testSerdeList()
+ {
+ CollectionSerde<String, List<String>> serdeList =
+ new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+
+ List<String> stringList = Lists.newArrayList("a", "b", "c");
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdeList.serialize(stringList, buffer);
+
+ Slice slice = buffer.toSlice();
+ List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+ Assert.assertEquals(stringList, deserializedList);
+ }
+
+ @Test
+ public void testSerdeSet()
+ {
+ CollectionSerde<String, Set<String>> serdeSet =
+ new CollectionSerde<>(new StringSerde(), (Class)HashSet.class);
+
+ Set<String> stringList = Sets.newHashSet("a", "b", "c");
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdeSet.serialize(stringList, buffer);
+
+ Slice slice = buffer.toSlice();
+ Set<String> deserializedSet = serdeSet.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+ Assert.assertEquals(stringList, deserializedSet);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
new file mode 100644
index 0000000..34b7088
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/GenericSerdeTest.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * SerdeKryoSlice unit tests
+ */
+public class GenericSerdeTest
+{
+ public static class TestPojo
+ {
+ private TestPojo()
+ {
+ }
+
+ public TestPojo(int intValue, String stringValue)
+ {
+ this.intValue = intValue;
+ this.stringValue = stringValue;
+ }
+
+ @Override
+ public boolean equals(Object other)
+ {
+ TestPojo o = (TestPojo)other;
+ return intValue == o.intValue && stringValue.equals(o.stringValue);
+ }
+
+ int intValue;
+ String stringValue;
+ }
+
+ @Test
+ public void stringListTest()
+ {
+ GenericSerde<ArrayList> serdeList = new GenericSerde<>(ArrayList.class);
+
+ ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdeList.serialize(stringList, buffer);
+ Slice slice = buffer.toSlice();
+ List<String> deserializedList = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertEquals(stringList, deserializedList);
+ }
+
+ @Test
+ public void pojoTest()
+ {
+ GenericSerde<TestPojo> serdePojo = new GenericSerde<>();
+ TestPojo pojo = new TestPojo(345, "xyz");
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdePojo.serialize(pojo, buffer);
+ Slice slice = buffer.toSlice();
+ TestPojo deserializedPojo = serdePojo.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertEquals(pojo, deserializedPojo);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
new file mode 100644
index 0000000..104ff04
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PairSerdeTest.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class PairSerdeTest
+{
+ @Test
+ public void simpleSerdeTest()
+ {
+ PairSerde<String, Integer> serdePair = new PairSerde<>(new StringSerde(), new IntSerde());
+
+ Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
+
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ serdePair.serialize(pair, buffer);
+ Slice slice = buffer.toSlice();
+
+ Pair<String, Integer> deserializedPair = serdePair.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+ Assert.assertEquals(pair, deserializedPair);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
deleted file mode 100644
index 3cb5b65..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerdeTest.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-public class PassThruByteArraySerdeTest
-{
- @Rule
- public SerdeByteArrayToByteArrayTestWatcher testMeta = new SerdeByteArrayToByteArrayTestWatcher();
-
- public static class SerdeByteArrayToByteArrayTestWatcher extends TestWatcher
- {
- public PassThruByteArraySerde serde;
-
- @Override
- protected void starting(Description description)
- {
- this.serde = new PassThruByteArraySerde();
- super.starting(description);
- }
- }
-
- @Test
- public void simpleSerializeTest()
- {
- byte[] byteArray = new byte[]{1, 2, 3};
- byte[] serialized = testMeta.serde.serialize(byteArray);
-
- Assert.assertArrayEquals(byteArray, serialized);
- }
-
- @Test
- public void simpleDeserializeTest()
- {
- byte[] byteArray = new byte[]{1, 2, 3};
- byte[] serialized = testMeta.serde.deserialize(byteArray);
-
- Assert.assertArrayEquals(byteArray, serialized);
- }
-
- @Test
- public void simpleDeserializeOffsetTest()
- {
- byte[] byteArray = new byte[]{1, 2, 3};
- byte[] serialized = testMeta.serde.deserialize(byteArray, new MutableInt(0));
-
- Assert.assertArrayEquals(byteArray, serialized);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
deleted file mode 100644
index f6085f6..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSliceTest.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdeCollectionSliceTest
-{
- @Test
- public void testSerdeList()
- {
- SerdeCollectionSlice<String, List<String>> serdeList =
- new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<List<String>>)(Class)ArrayList.class);
-
- List<String> stringList = Lists.newArrayList("a", "b", "c");
-
- Slice slice = serdeList.serialize(stringList);
-
- List<String> deserializedList = serdeList.deserialize(slice);
-
- Assert.assertEquals(stringList, deserializedList);
- }
-
- @Test
- public void testSerdeSet()
- {
- SerdeCollectionSlice<String, Set<String>> serdeSet =
- new SerdeCollectionSlice<>(new SerdeStringSlice(), (Class<Set<String>>)(Class)HashSet.class);
-
- Set<String> stringList = Sets.newHashSet("a", "b", "c");
-
- Slice slice = serdeSet.serialize(stringList);
-
- Set<String> deserializedSet = serdeSet.deserialize(slice);
-
- Assert.assertEquals(stringList, deserializedSet);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
new file mode 100644
index 0000000..ee24557
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeGeneralTest.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerdeGeneralTest
+{
+ private final int charNum = 62;
+ private String[] testData = null;
+ private final Random random = new Random();
+
+ @Before
+ public void generateTestData()
+ {
+ int size = random.nextInt(10000) + 1;
+ testData = new String[size];
+ for (int i = 0; i < size; ++i) {
+ char[] chars = new char[random.nextInt(10000) + 1];
+ for (int j = 0; j < chars.length; ++j) {
+ chars[j] = getRandomChar();
+ }
+
+ testData[i] = new String(chars);
+ }
+ }
+
+ private char getRandomChar()
+ {
+ int value = random.nextInt(62);
+ if (value < 10) {
+ return (char)(value + '0');
+ } else if (value < 36) {
+ return (char)(value + 'A');
+ }
+ return (char)(value + 'a');
+ }
+
+ @Test
+ public void testSerdeInt()
+ {
+ IntSerde intSerde = new IntSerde();
+
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+ int value = 123;
+ intSerde.serialize(value, buffer);
+
+ Slice slice = buffer.toSlice();
+
+ int deserializedValue = intSerde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+
+ Assert.assertEquals(value, deserializedValue);
+ }
+
+ @Test
+ public void testSerdeString()
+ {
+ testSerde(testData, new StringSerde(), new StringSerdeVerifier());
+ }
+
+ @Test
+ public void testSerdeArray()
+ {
+ testSerde(testData, ArraySerde.newSerde(new StringSerde(), String.class), new StringArraySerdeVerifier());
+ }
+
+
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void testSerdeCollection()
+ {
+ CollectionSerde<String, List<String>> listSerde = new CollectionSerde<>(new StringSerde(), (Class)ArrayList.class);
+ testSerde(testData, listSerde, new StringListSerdeVerifier());
+ }
+
+
+ public <T> void testSerde(String[] strs, Serde<T> serde, SerdeVerifier<T> verifier)
+ {
+ SerializationBuffer buffer = new SerializationBuffer(new WindowedBlockStream());
+
+ for (int i = 0; i < 10; ++i) {
+ buffer.beginWindow(i);
+ verifier.verifySerde(strs, serde, buffer);
+ buffer.endWindow();
+ if (i % 3 == 0) {
+ buffer.completeWindow(i);
+ }
+ if (i % 4 == 0) {
+ buffer.reset();
+ }
+ }
+ buffer.release();
+ }
+
+ public interface SerdeVerifier<T>
+ {
+ void verifySerde(String[] datas, Serde<T> serde, SerializationBuffer buffer);
+ }
+
+ public static class StringSerdeVerifier implements SerdeVerifier<String>
+ {
+ @Override
+ public void verifySerde(String[] datas, Serde<String> serde, SerializationBuffer buffer)
+ {
+ for (String str : datas) {
+ serde.serialize(str, buffer);
+ Slice slice = buffer.toSlice();
+ Assert.assertTrue("serialize failed, String: " + str, str.equals(serde.deserialize(new Input(slice.buffer, slice.offset, slice.length))));
+ }
+ }
+ }
+
+
+ public static class StringArraySerdeVerifier implements SerdeVerifier<String[]>
+ {
+ @Override
+ public void verifySerde(String[] datas, Serde<String[]> serde, SerializationBuffer buffer)
+ {
+ serde.serialize(datas, buffer);
+ Slice slice = buffer.toSlice();
+ String[] newStrs = serde.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertArrayEquals("serialize array failed.", datas, newStrs);
+ }
+ }
+
+ public static class StringListSerdeVerifier implements SerdeVerifier<List<String>>
+ {
+ @Override
+ public void verifySerde(String[] datas, Serde<List<String>> serdeList, SerializationBuffer buffer)
+ {
+ List<String> list = Arrays.asList(datas);
+
+ serdeList.serialize(list, buffer);
+ Slice slice = buffer.toSlice();
+ List<String> newStrs = serdeList.deserialize(new Input(slice.buffer, slice.offset, slice.length));
+ Assert.assertArrayEquals("serialize list failed.", datas, newStrs.toArray(new String[0]));
+
+ buffer.reset();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
deleted file mode 100644
index b780f66..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSliceTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * SerdeKryoSlice unit tests
- */
-public class SerdeKryoSliceTest
-{
- public static class TestPojo
- {
- private TestPojo()
- {
- }
-
- public TestPojo(int intValue, String stringValue)
- {
- this.intValue = intValue;
- this.stringValue = stringValue;
- }
-
- @Override
- public boolean equals(Object other)
- {
- TestPojo o = (TestPojo)other;
- return intValue == o.intValue && stringValue.equals(o.stringValue);
- }
-
- int intValue;
- String stringValue;
- }
-
- @Test
- public void stringListTest()
- {
- SerdeKryoSlice<ArrayList> serdeList = new SerdeKryoSlice<>(ArrayList.class);
-
- ArrayList<String> stringList = Lists.newArrayList("a", "b", "c");
- Slice slice = serdeList.serialize(stringList);
- List<String> deserializedList = serdeList.deserialize(slice);
- Assert.assertEquals(stringList, deserializedList);
- }
-
- @Test
- public void pojoTest()
- {
- SerdeKryoSlice<TestPojo> serdePojo = new SerdeKryoSlice<>();
- TestPojo pojo = new TestPojo(345, "xyz");
- Slice slice = serdePojo.serialize(pojo);
- TestPojo deserializedPojo = serdePojo.deserialize(slice);
- Assert.assertEquals(pojo, deserializedPojo);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java b/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
deleted file mode 100644
index 6684a9f..0000000
--- a/library/src/test/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSliceTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-
-import com.datatorrent.netlet.util.Slice;
-
-public class SerdePairSliceTest
-{
- @Test
- public void simpleSerdeTest()
- {
- SerdePairSlice<String, Integer> serdePair = new SerdePairSlice<>(new SerdeStringSlice(), new SerdeIntSlice());
-
- Pair<String, Integer> pair = new ImmutablePair<>("abc", 123);
-
- Slice slice = serdePair.serialize(pair);
-
- Pair<String, Integer> deserializedPair = serdePair.deserialize(slice);
-
- Assert.assertEquals(pair, deserializedPair);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
index 3b7789c..a44e454 100644
--- a/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
+++ b/library/src/test/java/org/apache/apex/malhar/lib/window/SpillableWindowedStorageTest.java
@@ -47,8 +47,14 @@ public class SpillableWindowedStorageTest
Window window2 = new Window.TimeWindow<>(1010, 10);
Window window3 = new Window.TimeWindow<>(1020, 10);
storage.setSpillableComplexComponent(sccImpl);
- storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+ /*
+ * storage.setup() will create Spillable Data Structures
+ * storage.getSpillableComplexComponent().setup() will setup these Data Structures.
+ * So storage.setup() should be called before storage.getSpillableComplexComponent().setup()
+ */
storage.setup(testMeta.operatorContext);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
sccImpl.beginWindow(1000);
storage.put(window1, 1);
@@ -103,8 +109,15 @@ public class SpillableWindowedStorageTest
Window window2 = new Window.TimeWindow<>(1010, 10);
Window window3 = new Window.TimeWindow<>(1020, 10);
storage.setSpillableComplexComponent(sccImpl);
- storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
+ /*
+ * storage.setup() will create Spillable Data Structures
+ * storage.getSpillableComplexComponent().setup() will setup these Data Structures.
+ * So storage.setup() should be called before storage.getSpillableComplexComponent().setup()
+ */
storage.setup(testMeta.operatorContext);
+ storage.getSpillableComplexComponent().setup(testMeta.operatorContext);
+
sccImpl.beginWindow(1000);
storage.put(window1, "x", 1);
[3/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use
reusable buffer for serialization in spillable data structures
Posted by th...@apache.org.
APEXMALHAR-2190 #resolve #comment Use reusable buffer for serialization in spillable data structures
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2fa1e6b1
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2fa1e6b1
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2fa1e6b1
Branch: refs/heads/master
Commit: 2fa1e6b16312eecdd074520d431902f11d555221
Parents: 3799157
Author: brightchen <br...@datatorrent.com>
Authored: Mon Aug 15 17:46:27 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Oct 24 12:52:08 2016 -0700
----------------------------------------------------------------------
.../spillable/SpillableBenchmarkApp.java | 69 +++++
.../spillable/SpillableTestInputOperator.java | 46 ++++
.../spillable/SpillableTestOperator.java | 189 ++++++++++++++
.../spillable/SpillableBenchmarkAppTester.java | 73 ++++++
.../spillable/SpillableDSBenchmarkTest.java | 171 +++++++++++++
.../state/ManagedStateBenchmarkAppTest.java | 101 ++++++++
.../state/ManagedStateBenchmarkAppTester.java | 101 --------
benchmark/src/test/resources/log4j.properties | 2 +
.../state/managed/AbstractManagedStateImpl.java | 34 ++-
.../apex/malhar/lib/state/managed/Bucket.java | 85 +++++--
.../lib/state/managed/BucketProvider.java | 40 +++
.../state/spillable/SpillableArrayListImpl.java | 17 +-
.../SpillableArrayListMultimapImpl.java | 53 ++--
.../spillable/SpillableComplexComponent.java | 29 +--
.../SpillableComplexComponentImpl.java | 64 +++--
.../lib/state/spillable/SpillableMapImpl.java | 44 ++--
.../lib/state/spillable/SpillableSetImpl.java | 45 +---
.../spillable/SpillableSetMultimapImpl.java | 45 ++--
.../state/spillable/SpillableStateStore.java | 3 +-
.../state/spillable/WindowBoundedMapCache.java | 5 +-
.../inmem/InMemSpillableStateStore.java | 26 ++
.../utils/serde/AffixKeyValueSerdeManager.java | 76 ++++++
.../apex/malhar/lib/utils/serde/AffixSerde.java | 68 +++++
.../apex/malhar/lib/utils/serde/ArraySerde.java | 97 ++++++++
.../apex/malhar/lib/utils/serde/Block.java | 217 ++++++++++++++++
.../lib/utils/serde/BlockReleaseStrategy.java | 47 ++++
.../malhar/lib/utils/serde/BlockStream.java | 179 +++++++++++++
.../malhar/lib/utils/serde/BufferSlice.java | 100 ++++++++
.../malhar/lib/utils/serde/CollectionSerde.java | 97 ++++++++
.../serde/DefaultBlockReleaseStrategy.java | 96 +++++++
.../malhar/lib/utils/serde/GenericSerde.java | 81 ++++++
.../apex/malhar/lib/utils/serde/IntSerde.java | 45 ++++
.../utils/serde/KeyValueByteStreamProvider.java | 37 +++
.../lib/utils/serde/KeyValueSerdeManager.java | 86 +++++++
.../apex/malhar/lib/utils/serde/LongSerde.java | 45 ++++
.../apex/malhar/lib/utils/serde/PairSerde.java | 73 ++++++
.../lib/utils/serde/PassThruByteArraySerde.java | 51 ----
.../serde/PassThruByteArraySliceSerde.java | 61 -----
.../lib/utils/serde/PassThruSliceSerde.java | 32 ++-
.../apex/malhar/lib/utils/serde/Serde.java | 41 +--
.../lib/utils/serde/SerdeCollectionSlice.java | 120 ---------
.../malhar/lib/utils/serde/SerdeIntSlice.java | 54 ----
.../malhar/lib/utils/serde/SerdeKryoSlice.java | 100 --------
.../malhar/lib/utils/serde/SerdeLongSlice.java | 54 ----
.../malhar/lib/utils/serde/SerdePairSlice.java | 89 -------
.../lib/utils/serde/SerdeStringSlice.java | 55 ----
.../lib/utils/serde/SerializationBuffer.java | 130 ++++++++++
.../apex/malhar/lib/utils/serde/SliceUtils.java | 10 +
.../malhar/lib/utils/serde/StringSerde.java | 45 ++++
.../lib/utils/serde/WindowCompleteListener.java | 29 +++
.../lib/utils/serde/WindowedBlockStream.java | 249 +++++++++++++++++++
.../impl/SpillableSessionWindowedStorage.java | 3 +-
.../impl/SpillableWindowedKeyedStorage.java | 28 +--
.../impl/SpillableWindowedPlainStorage.java | 18 +-
.../com/datatorrent/lib/util/TestUtils.java | 3 +-
.../lib/state/managed/DefaultBucketTest.java | 48 +++-
.../state/managed/ManagedStateTestUtils.java | 3 +-
.../spillable/SpillableArrayListImplTest.java | 12 +-
.../SpillableArrayListMultimapImplTest.java | 30 ++-
.../SpillableComplexComponentImplTest.java | 6 +-
.../state/spillable/SpillableMapImplTest.java | 39 ++-
.../state/spillable/SpillableSetImplTest.java | 4 +-
.../spillable/SpillableSetMultimapImplTest.java | 18 +-
.../lib/state/spillable/SpillableTestUtils.java | 46 ++--
.../spillable/TimeBasedPriorityQueueTest.java | 3 -
.../malhar/lib/utils/serde/AffixSerdeTest.java | 43 ++++
.../malhar/lib/utils/serde/BlockStreamTest.java | 179 +++++++++++++
.../lib/utils/serde/CollectionSerdeTest.java | 68 +++++
.../lib/utils/serde/GenericSerdeTest.java | 84 +++++++
.../malhar/lib/utils/serde/PairSerdeTest.java | 48 ++++
.../utils/serde/PassThruByteArraySerdeTest.java | 72 ------
.../utils/serde/SerdeCollectionSliceTest.java | 65 -----
.../lib/utils/serde/SerdeGeneralTest.java | 169 +++++++++++++
.../lib/utils/serde/SerdeKryoSliceTest.java | 79 ------
.../lib/utils/serde/SerdePairSliceTest.java | 44 ----
.../window/SpillableWindowedStorageTest.java | 17 +-
76 files changed, 3570 insertions(+), 1265 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
new file mode 100644
index 0000000..e2fe8bb
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkApp.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.ApplicationAnnotation;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+@ApplicationAnnotation(name = "SpillableBenchmarkApp")
+public class SpillableBenchmarkApp implements StreamingApplication
+{
+ protected final String PROP_STORE_PATH = "dt.application.SpillableBenchmarkApp.storeBasePath";
+
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ // Create ActiveMQStringSinglePortOutputOperator
+ SpillableTestInputOperator input = new SpillableTestInputOperator();
+ input.batchSize = 100;
+ input.sleepBetweenBatch = 0;
+ input = dag.addOperator("input", input);
+
+ SpillableTestOperator testOperator = new SpillableTestOperator();
+ testOperator.store = createStore(conf);
+ testOperator.shutdownCount = -1;
+ testOperator = dag.addOperator("test", testOperator );
+
+
+ // Connect ports
+ dag.addStream("stream", input.output, testOperator.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
+ }
+
+
+ public ManagedStateSpillableStateStore createStore(Configuration conf)
+ {
+ String basePath = getStoreBasePath(conf);
+ ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath(basePath);
+ return store;
+ }
+
+ public String getStoreBasePath(Configuration conf)
+ {
+ return Preconditions.checkNotNull(conf.get(PROP_STORE_PATH),
+ "base path should be specified in the properties.xml");
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
new file mode 100644
index 0000000..2e33721
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestInputOperator.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestInputOperator extends BaseOperator implements InputOperator
+{
+ public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
+ public long count = 0;
+ public int batchSize = 100;
+ public int sleepBetweenBatch = 1;
+
+ @Override
+ public void emitTuples()
+ {
+ for (int i = 0; i < batchSize; ++i) {
+ output.emit("" + ++count);
+ }
+ if (sleepBetweenBatch > 0) {
+ try {
+ Thread.sleep(sleepBetweenBatch);
+ } catch (InterruptedException e) {
+ //ignore
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
----------------------------------------------------------------------
diff --git a/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
new file mode 100644
index 0000000..3c5bf71
--- /dev/null
+++ b/benchmark/src/main/java/com/datatorrent/benchmark/spillable/SpillableTestOperator.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableArrayListMultimapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.LongSerde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.Operator.ShutdownException;
+import com.datatorrent.common.util.BaseOperator;
+
+public class SpillableTestOperator extends BaseOperator implements Operator.CheckpointNotificationListener
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableTestOperator.class);
+
+ public static final byte[] ID1 = new byte[] {(byte)1};
+ public static final byte[] ID2 = new byte[] {(byte)2};
+ public static final byte[] ID3 = new byte[] {(byte)3};
+
+ public SpillableArrayListMultimapImpl<String, String> multiMap;
+
+ public ManagedStateSpillableStateStore store;
+
+ public long totalCount = 0;
+ public transient long countInWindow;
+ public long minWinId = -1;
+ public long committedWinId = -1;
+ public long windowId;
+
+ public SpillableMapImpl<Long, Long> windowToCount;
+
+ public long shutdownCount = -1;
+
+ public static Throwable errorTrace;
+
+ public final transient DefaultInputPort<String> input = new DefaultInputPort<String>()
+ {
+ @Override
+ public void process(String tuple)
+ {
+ processTuple(tuple);
+ }
+ };
+
+ public void processTuple(String tuple)
+ {
+ if (++totalCount == shutdownCount) {
+ throw new RuntimeException("Test recovery. count = " + totalCount);
+ }
+ countInWindow++;
+ multiMap.put("" + windowId, tuple);
+ }
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ super.setup(context);
+ if (windowToCount == null) {
+ windowToCount = createWindowToCountMap(store);
+ }
+ if (multiMap == null) {
+ multiMap = createMultimap(store);
+ }
+
+ store.setup(context);
+ multiMap.setup(context);
+
+ checkData();
+ }
+
+ public void checkData()
+ {
+ long startTime = System.currentTimeMillis();
+ logger.debug("check data: totalCount: {}; minWinId: {}; committedWinId: {}; curWinId: {}", totalCount,
+ this.minWinId, committedWinId, this.windowId);
+ for (long winId = Math.max(committedWinId + 1, minWinId); winId < this.windowId; ++winId) {
+ Long count = this.windowToCount.get(winId);
+ SpillableArrayListImpl<String> datas = (SpillableArrayListImpl<String>)multiMap.get("" + winId);
+ String msg;
+ if (((datas == null && count != null) || (datas != null && count == null)) || (datas == null && count == null)) {
+ msg = "Invalid data/count. datas: " + datas + "; count: " + count;
+ logger.error(msg);
+ errorTrace = new RuntimeException(msg);
+ throw new ShutdownException();
+ } else {
+ int dataSize = datas.size();
+ if ((long)count != (long)dataSize) {
+ msg = String.format("data size not equal: window Id: %d; datas size: %d; count: %d", winId, dataSize, count);
+ logger.error(msg);
+ errorTrace = new RuntimeException(msg);
+ throw new ShutdownException();
+ }
+ }
+ }
+ logger.info("check data took {} millis.", System.currentTimeMillis() - startTime);
+ }
+
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public void beginWindow(long windowId)
+ {
+ store.beginWindow(windowId);
+ multiMap.beginWindow(windowId);
+ if (minWinId < 0) {
+ minWinId = windowId;
+ }
+
+ this.windowId = windowId;
+ countInWindow = 0;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ multiMap.endWindow();
+ windowToCount.put(windowId, countInWindow);
+ windowToCount.endWindow();
+ store.endWindow();
+
+ if (windowId % 10 == 0) {
+ checkData();
+ }
+ }
+
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ store.beforeCheckpoint(windowId);
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @Override
+ public void committed(long windowId)
+ {
+ this.committedWinId = windowId;
+ store.committed(windowId);
+ }
+
+ public static SpillableArrayListMultimapImpl<String, String> createMultimap(SpillableStateStore store)
+ {
+ return new SpillableArrayListMultimapImpl<String, String>(store, ID1, 0L, new StringSerde(),
+ new StringSerde());
+ }
+
+ public static SpillableMapImpl<String, String> createMap(SpillableStateStore store)
+ {
+ return new SpillableMapImpl<String, String>(store, ID2, 0L, new StringSerde(),
+ new StringSerde());
+ }
+
+ public static SpillableMapImpl<Long, Long> createWindowToCountMap(SpillableStateStore store)
+ {
+ return new SpillableMapImpl<Long, Long>(store, ID3, 0L, new LongSerde(),
+ new LongSerde());
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
new file mode 100644
index 0000000..7f94079
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableBenchmarkAppTester.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+
+public class SpillableBenchmarkAppTester extends SpillableBenchmarkApp
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableBenchmarkAppTester.class);
+ public static final String basePath = "target/temp";
+ @Test
+ public void test() throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(60000);
+
+ lc.shutdown();
+
+ if (SpillableTestOperator.errorTrace != null) {
+ logger.error("Error.", SpillableTestOperator.errorTrace);
+ Assert.assertNull(SpillableTestOperator.errorTrace.getMessage(), SpillableTestOperator.errorTrace);
+ }
+ }
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
new file mode 100644
index 0000000..7e64c5f
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/spillable/SpillableDSBenchmarkTest.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.spillable;
+
+import java.util.Random;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableMapImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableTestUtils;
+import org.apache.apex.malhar.lib.state.spillable.managed.ManagedStateSpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.StringSerde;
+
+import com.datatorrent.lib.fileaccess.TFileImpl;
+
+
+public class SpillableDSBenchmarkTest
+{
+ private static final Logger logger = LoggerFactory.getLogger(SpillableDSBenchmarkTest.class);
+ protected static final int loopCount = 100000000;
+ protected static final long oneMB = 1024 * 1024;
+ protected static final int keySize = 500000;
+ protected static final int valueSize = 100000;
+ protected static final int maxKeyLength = 100;
+ protected static final int maxValueLength = 1000;
+
+ protected static final int tuplesPerWindow = 10000;
+ protected static final int checkPointWindows = 10;
+ protected static final int commitDelays = 100;
+
+ protected final transient Random random = new Random();
+ protected String[] keys;
+ protected String[] values;
+
+ @Rule
+ public SpillableTestUtils.TestMeta testMeta = new SpillableTestUtils.TestMeta();
+
+
+ @Before
+ public void setup()
+ {
+ keys = new String[keySize];
+ for (int i = 0; i < keys.length; ++i) {
+ keys[i] = this.randomString(maxKeyLength);
+ }
+
+ values = new String[valueSize];
+ for (int i = 0; i < values.length; ++i) {
+ values[i] = this.randomString(maxValueLength);
+ }
+ }
+
+ @Test
+ public void testSpillableMap()
+ {
+ byte[] ID1 = new byte[]{(byte)1};
+ ManagedStateSpillableStateStore store = new ManagedStateSpillableStateStore();
+ ((TFileImpl.DTFileImpl)store.getFileAccess()).setBasePath("target/temp");
+
+ StringSerde keySerde = createKeySerde();
+ Serde<String> valueSerde = createValueSerde();
+
+ SpillableMapImpl<String, String> map = new SpillableMapImpl<String, String>(store, ID1, 0L, keySerde, valueSerde);
+ store.setup(testMeta.operatorContext);
+ map.setup(testMeta.operatorContext);
+
+ final long startTime = System.currentTimeMillis();
+
+ long windowId = 0;
+ store.beginWindow(++windowId);
+ map.beginWindow(windowId);
+
+ int outputTimes = 0;
+ for (int i = 0; i < loopCount; ++i) {
+ putEntry(map);
+
+ if (i % tuplesPerWindow == 0) {
+ map.endWindow();
+ store.endWindow();
+
+ if (i % (tuplesPerWindow * checkPointWindows) == 0) {
+ store.beforeCheckpoint(windowId);
+
+ if (windowId > commitDelays) {
+ store.committed(windowId - commitDelays);
+ }
+ }
+
+ //next window
+ store.beginWindow(++windowId);
+ map.beginWindow(windowId);
+ }
+
+ long spentTime = System.currentTimeMillis() - startTime;
+ if (spentTime > outputTimes * 5000) {
+ ++outputTimes;
+ logger.info("Total Statistics: Spent {} mills for {} operation. average/second: {}", spentTime, i, i * 1000 / spentTime);
+ checkEnvironment();
+ }
+ }
+ long spentTime = System.currentTimeMillis() - startTime;
+
+ logger.info("Spent {} mills for {} operation. average: {}", spentTime, loopCount,
+ loopCount / spentTime);
+ }
+
+
+ public void putEntry(SpillableMapImpl<String, String> map)
+ {
+ map.put(keys[random.nextInt(keys.length)], values[random.nextInt(values.length)]);
+ }
+
+ public static final String characters = "0123456789ABCDEFGHIJKLMNOPKRSTUVWXYZabcdefghijklmopqrstuvwxyz";
+
+ protected static final char[] text = new char[Math.max(maxKeyLength, maxValueLength)];
+
+ public String randomString(int length)
+ {
+ for (int i = 0; i < length; i++) {
+ text[i] = characters.charAt(random.nextInt(characters.length()));
+ }
+ return new String(text, 0, length);
+ }
+
+ public void checkEnvironment()
+ {
+ Runtime runtime = Runtime.getRuntime();
+
+ long maxMemory = runtime.maxMemory() / oneMB;
+ long allocatedMemory = runtime.totalMemory() / oneMB;
+ long freeMemory = runtime.freeMemory() / oneMB;
+
+ logger.info("freeMemory: {}M; allocatedMemory: {}M; maxMemory: {}M", freeMemory,
+ allocatedMemory, maxMemory);
+
+ Assert.assertFalse("Run out of memory.", allocatedMemory == maxMemory && freeMemory < 10);
+ }
+
+ protected StringSerde createKeySerde()
+ {
+ return new StringSerde();
+ }
+
+ protected Serde<String> createValueSerde()
+ {
+ return new StringSerde();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
new file mode 100644
index 0000000..4792843
--- /dev/null
+++ b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.benchmark.state;
+
+import java.io.File;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileUtil;
+
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
+
+/**
+ * This is not a really unit test, but in fact a benchmark runner.
+ * Provides this class to give developers the convenience to run in local IDE environment.
+ *
+ */
+public class ManagedStateBenchmarkAppTest extends ManagedStateBenchmarkApp
+{
+ public static final String basePath = "target/temp";
+
+ @Before
+ public void before()
+ {
+ FileUtil.fullyDelete(new File(basePath));
+ }
+
+ @Test
+ public void testUpdateSync() throws Exception
+ {
+ test(ExecMode.UPDATESYNC);
+ }
+
+ @Test
+ public void testUpdateAsync() throws Exception
+ {
+ test(ExecMode.UPDATEASYNC);
+ }
+
+ @Test
+ public void testInsert() throws Exception
+ {
+ test(ExecMode.INSERT);
+ }
+
+ public void test(ExecMode exeMode) throws Exception
+ {
+ Configuration conf = new Configuration(false);
+
+ LocalMode lma = LocalMode.newInstance();
+ DAG dag = lma.getDAG();
+
+ super.populateDAG(dag, conf);
+ storeOperator.execMode = exeMode;
+
+ StreamingApplication app = new StreamingApplication()
+ {
+ @Override
+ public void populateDAG(DAG dag, Configuration conf)
+ {
+ }
+ };
+
+ lma.prepareDAG(app, conf);
+
+ // Create local cluster
+ final LocalMode.Controller lc = lma.getController();
+ lc.run(300000);
+
+ lc.shutdown();
+ }
+
+
+
+ @Override
+ public String getStoreBasePath(Configuration conf)
+ {
+ return basePath;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
----------------------------------------------------------------------
diff --git a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java b/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
deleted file mode 100644
index 4435aad..0000000
--- a/benchmark/src/test/java/com/datatorrent/benchmark/state/ManagedStateBenchmarkAppTester.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.benchmark.state;
-
-import java.io.File;
-
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileUtil;
-
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.StreamingApplication;
-import com.datatorrent.benchmark.state.StoreOperator.ExecMode;
-
-/**
- * This is not a really unit test, but in fact a benchmark runner.
- * Provides this class to give developers the convenience to run in local IDE environment.
- *
- */
-public class ManagedStateBenchmarkAppTester extends ManagedStateBenchmarkApp
-{
- public static final String basePath = "target/temp";
-
- @Before
- public void before()
- {
- FileUtil.fullyDelete(new File(basePath));
- }
-
- @Test
- public void testUpdateSync() throws Exception
- {
- test(ExecMode.UPDATESYNC);
- }
-
- @Test
- public void testUpdateAsync() throws Exception
- {
- test(ExecMode.UPDATEASYNC);
- }
-
- @Test
- public void testInsert() throws Exception
- {
- test(ExecMode.INSERT);
- }
-
- public void test(ExecMode exeMode) throws Exception
- {
- Configuration conf = new Configuration(false);
-
- LocalMode lma = LocalMode.newInstance();
- DAG dag = lma.getDAG();
-
- super.populateDAG(dag, conf);
- storeOperator.execMode = exeMode;
-
- StreamingApplication app = new StreamingApplication()
- {
- @Override
- public void populateDAG(DAG dag, Configuration conf)
- {
- }
- };
-
- lma.prepareDAG(app, conf);
-
- // Create local cluster
- final LocalMode.Controller lc = lma.getController();
- lc.run(300000);
-
- lc.shutdown();
- }
-
-
-
- @Override
- public String getStoreBasePath(Configuration conf)
- {
- return basePath;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/benchmark/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/benchmark/src/test/resources/log4j.properties b/benchmark/src/test/resources/log4j.properties
index cf0d19e..3fc0120 100644
--- a/benchmark/src/test/resources/log4j.properties
+++ b/benchmark/src/test/resources/log4j.properties
@@ -41,3 +41,5 @@ log4j.logger.org=info
#log4j.logger.org.apache.commons.beanutils=warn
log4j.logger.com.datatorrent=debug
log4j.logger.org.apache.apex=debug
+log4j.logger.org.apache.apex.malhar.lib.state.managed=info
+log4j.logger.com.datatorrent.common.util.FSStorageAgent=info
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
index dd2bbab..20271b0 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -125,7 +125,7 @@ import com.datatorrent.netlet.util.Slice;
*/
public abstract class AbstractManagedStateImpl
implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
- TimeBucketAssigner.PurgeListener
+ TimeBucketAssigner.PurgeListener, BucketProvider
{
private long maxMemorySize;
@@ -319,11 +319,24 @@ public abstract class AbstractManagedStateImpl
return (int)(bucketId % numBuckets);
}
- Bucket getBucket(long bucketId)
+ @Override
+ public Bucket getBucket(long bucketId)
{
return buckets[getBucketIdx(bucketId)];
}
+ @Override
+ public Bucket ensureBucket(long bucketId)
+ {
+ Bucket b = getBucket(bucketId);
+ if (b == null) {
+ b = newBucket(bucketId);
+ b.setup(this);
+ buckets[getBucketIdx(bucketId)] = b;
+ }
+ return b;
+ }
+
protected Bucket newBucket(long bucketId)
{
return new Bucket.DefaultBucket(bucketId);
@@ -384,6 +397,22 @@ public abstract class AbstractManagedStateImpl
}
}
+ /**
+ * get the memory usage for each bucket
+ * @return The map of bucket id to memory size used by the bucket
+ */
+ public Map<Long, Long> getBucketMemoryUsage()
+ {
+ Map<Long, Long> bucketToSize = Maps.newHashMap();
+ for (Bucket bucket : buckets) {
+ if (bucket == null) {
+ continue;
+ }
+ bucketToSize.put(bucket.getBucketId(), bucket.getKeyStream().size() + bucket.getValueStream().size());
+ }
+ return bucketToSize;
+ }
+
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
@Override
public void teardown()
@@ -476,6 +505,7 @@ public abstract class AbstractManagedStateImpl
this.keyComparator = Preconditions.checkNotNull(keyComparator);
}
+ @Override
public BucketsFileSystem getBucketsFileSystem()
{
return bucketsFileSystem;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
index 4fc2327..cbc4e03 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -24,6 +24,7 @@ import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -32,6 +33,10 @@ import javax.validation.constraints.NotNull;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.malhar.lib.utils.serde.KeyValueByteStreamProvider;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.apex.malhar.lib.utils.serde.WindowedBlockStream;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
@@ -46,7 +51,7 @@ import com.datatorrent.netlet.util.Slice;
*
* @since 3.4.0
*/
-public interface Bucket extends ManagedStateComponent
+public interface Bucket extends ManagedStateComponent, KeyValueByteStreamProvider
{
/**
* @return bucket id
@@ -218,13 +223,22 @@ public interface Bucket extends ManagedStateComponent
private transient TreeMap<Long, BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+ /**
+ * By default, separate keys and values into two different streams.
+ * key stream and value stream should be created during construction instead of setup, as the reference of the streams will be passed to the serialize method
+ */
+ protected WindowedBlockStream keyStream = new WindowedBlockStream();
+ protected WindowedBlockStream valueStream = new WindowedBlockStream();
+
+ protected ConcurrentLinkedQueue<Long> windowsForFreeMemory = new ConcurrentLinkedQueue<>();
+
private DefaultBucket()
{
//for kryo
bucketId = -1;
}
- protected DefaultBucket(long bucketId)
+ public DefaultBucket(long bucketId)
{
this.bucketId = bucketId;
}
@@ -321,6 +335,9 @@ public interface Bucket extends ManagedStateComponent
@Override
public Slice get(Slice key, long timeBucket, ReadSource readSource)
{
+ // This call is lightweight
+ releaseMemory();
+ key = SliceUtils.toBufferSlice(key);
switch (readSource) {
case MEMORY:
return getFromMemory(key);
@@ -392,6 +409,11 @@ public interface Bucket extends ManagedStateComponent
@Override
public void put(Slice key, long timeBucket, Slice value)
{
+ // This call is lightweight
+ releaseMemory();
+ key = SliceUtils.toBufferSlice(key);
+ value = SliceUtils.toBufferSlice(value);
+
BucketedValue bucketedValue = flash.get(key);
if (bucketedValue == null) {
bucketedValue = new BucketedValue(timeBucket, value);
@@ -409,39 +431,45 @@ public interface Bucket extends ManagedStateComponent
}
}
+ /**
+ * Free memory up to the given windowId
+ * This method will be called by another thread. Adding concurrency control to Stream would impact the performance.
+ * This method only calculates the size of the memory that could be released and then sends free memory request to the operator thread
+ */
@Override
public long freeMemory(long windowId) throws IOException
{
- long memoryFreed = 0;
- Long clearWindowId;
-
- while ((clearWindowId = committedData.floorKey(windowId)) != null) {
- Map<Slice, BucketedValue> windowData = committedData.remove(clearWindowId);
+ // calculate the size first and then send the release memory request. It could reduce the chance of conflict and increase the performance.
+ long size = keyStream.dataSizeUpToWindow(windowId) + valueStream.dataSizeUpToWindow(windowId);
+ windowsForFreeMemory.add(windowId);
+ return size;
+ }
- for (Map.Entry<Slice, BucketedValue> entry: windowData.entrySet()) {
- memoryFreed += entry.getKey().length + entry.getValue().getSize();
- }
+ /**
+ * This operation must be called from operator thread. It won't do anything if no memory to be freed
+ */
+ protected long releaseMemory()
+ {
+ long memoryFreed = 0;
+ while (!windowsForFreeMemory.isEmpty()) {
+ long windowId = windowsForFreeMemory.poll();
+ long originSize = keyStream.size() + valueStream.size();
+ keyStream.completeWindow(windowId);
+ valueStream.completeWindow(windowId);
+ memoryFreed += originSize - (keyStream.size() + valueStream.size());
}
- fileCache.clear();
- if (cachedBucketMetas != null) {
-
- for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas.values()) {
- FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
- if (reader != null) {
- memoryFreed += tbm.getSizeInBytes();
- reader.close();
- }
- }
+ if (memoryFreed > 0) {
+ LOG.debug("Total freed memory size: {}", memoryFreed);
+ sizeInBytes.getAndAdd(-memoryFreed);
}
- sizeInBytes.getAndAdd(-memoryFreed);
- LOG.debug("space freed {} {}", bucketId, memoryFreed);
return memoryFreed;
}
@Override
public Map<Slice, BucketedValue> checkpoint(long windowId)
{
+ releaseMemory();
try {
//transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
return flash;
@@ -548,6 +576,19 @@ public interface Bucket extends ManagedStateComponent
return checkpointedData;
}
+
+ @Override
+ public WindowedBlockStream getKeyStream()
+ {
+ return keyStream;
+ }
+
+ @Override
+ public WindowedBlockStream getValueStream()
+ {
+ return valueStream;
+ }
+
private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
new file mode 100644
index 0000000..bbd18ac
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketProvider.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * This interface declares methods to get bucket by bucket id
+ *
+ */
+public interface BucketProvider
+{
+ /**
+ * get bucket by bucket id
+ * @param bucketId
+ * @return
+ */
+ public Bucket getBucket(long bucketId);
+
+ /**
+ * Create bucket if not exist, return the bucket
+ * @param bucketId
+ * @return
+ */
+ public Bucket ensureBucket(long bucketId);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
index a59872c..d0ca9ff 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -26,9 +26,9 @@ import java.util.ListIterator;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.CollectionSerde;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeCollectionSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -37,7 +37,6 @@ import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -58,11 +57,10 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
@NotNull
private SpillableStateStore store;
@NotNull
- private Serde<T, Slice> serde;
+ private Serde<T> serde;
@NotNull
private SpillableMapImpl<Integer, List<T>> map;
- private boolean sizeCached = false;
private int size;
private int numBatches;
@@ -86,15 +84,15 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
*/
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde)
+ @NotNull Serde<T> serde)
{
this.bucketId = bucketId;
this.prefix = Preconditions.checkNotNull(prefix);
this.store = Preconditions.checkNotNull(store);
this.serde = Preconditions.checkNotNull(serde);
- map = new SpillableMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(),
- new SerdeCollectionSlice<>(serde, (Class<List<T>>)(Class)ArrayList.class));
+ map = new SpillableMapImpl<>(store, prefix, bucketId, new IntSerde(),
+ new CollectionSerde<T, List<T>>(serde, (Class)ArrayList.class));
}
/**
@@ -111,7 +109,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
*/
public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde,
+ @NotNull Serde<T> serde,
int batchSize)
{
this(bucketId, prefix, store, serde);
@@ -328,6 +326,7 @@ public class SpillableArrayListImpl<T> implements Spillable.SpillableList<T>, Sp
@Override
public void setup(Context.OperatorContext context)
{
+ store.ensureBucket(bucketId);
map.setup(context);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
index 0944583..d3340ce 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListMultimapImpl.java
@@ -26,10 +26,10 @@ import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
@@ -62,10 +62,11 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@NotNull
private SpillableMapImpl<Slice, Integer> map;
private SpillableStateStore store;
- private byte[] identifier;
private long bucket;
- private Serde<K, Slice> serdeKey;
- private Serde<V, Slice> serdeValue;
+ private Serde<V> valueSerde;
+
+ protected transient Context.OperatorContext context;
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
private SpillableArrayListMultimapImpl()
{
@@ -78,20 +79,20 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
* @param identifier The Id of this {@link SpillableArrayListMultimapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param valueSerde The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ this.valueSerde = Preconditions.checkNotNull(valueSerde);
+
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(SIZE_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
- map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdeIntSlice());
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new IntSerde());
}
public SpillableStateStore getStore()
@@ -110,15 +111,12 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
if (spillableArrayList == null) {
- Slice keySlice = serdeKey.serialize(key);
- Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX));
-
+ Integer size = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
if (size == null) {
return null;
}
- Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, false).toByteArray(), store, valueSerde);
spillableArrayList.setSize(size);
}
@@ -179,8 +177,7 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@Override
public boolean containsKey(@Nullable Object key)
{
- return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
- SIZE_KEY_SUFFIX));
+ return cache.contains((K)key) || map.containsKey(keyValueSerdeManager.serializeMetaKey((K)key, false));
}
@Override
@@ -217,9 +214,9 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
if (spillableArrayList == null) {
- Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
- spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
-
+ Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, true);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, valueSerde);
+ spillableArrayList.setup(context);
cache.put(key, spillableArrayList);
}
@@ -272,14 +269,19 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
@Override
public void setup(Context.OperatorContext context)
{
+ this.context = context;
+
map.setup(context);
isRunning = true;
+
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
+ keyValueSerdeManager.beginWindow(windowId);
isInWindow = true;
}
@@ -292,13 +294,14 @@ public class SpillableArrayListMultimapImpl<K, V> implements Spillable.Spillable
SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
spillableArrayList.endWindow();
- Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX),
- spillableArrayList.size());
+ map.put(keyValueSerdeManager.serializeMetaKey(key, true), spillableArrayList.size());
}
Preconditions.checkState(cache.getRemovedKeys().isEmpty());
cache.endWindow();
map.endWindow();
+
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index c4462d5..542a914 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -24,7 +24,6 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context.OperatorContext;
import com.datatorrent.api.Operator;
-import com.datatorrent.netlet.util.Slice;
/**
* This is a composite component containing spillable data structures. This should be used as
@@ -43,7 +42,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
* @return A {@link SpillableList}.
*/
- <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableList}.
@@ -53,7 +52,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableList}.
* @return A {@link SpillableList}.
*/
- <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableMap}. This method
@@ -65,8 +64,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the map's values.
* @return A {@link SpillableMap}.
*/
- <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue);
+ <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableMap}.
@@ -79,7 +78,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @return A {@link SpillableMap}.
*/
<K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue);
+ Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableListMultimap}. This method
@@ -91,8 +90,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableListMultimap}.
*/
- <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue);
+ <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableListMultimap}.
@@ -105,8 +103,8 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @return A {@link SpillableListMultimap}.
*/
<K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue);
+ Serde<K> serdeKey,
+ Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableSetMultimap}.
@@ -117,8 +115,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serdeValue The Serializer/Deserializer to use for the values in the map's lists.
* @return A {@link SpillableSetMultimap}.
*/
- <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue);
+ <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue);
/**
* This is a method for creating a {@link SpillableMultiset}. This method
@@ -128,7 +125,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
* @return A {@link SpillableMultiset}.
*/
- <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableMultiset}.
@@ -138,7 +135,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableMultiset}.
* @return A {@link SpillableMultiset}.
*/
- <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableQueue}. This method
@@ -148,7 +145,7 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
- <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde);
+ <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde);
/**
* This is a method for creating a {@link SpillableQueue}.
@@ -158,5 +155,5 @@ public interface SpillableComplexComponent extends Component<OperatorContext>, S
* @param serde The Serializer/Deserializer to use for data stored in the {@link SpillableQueue}.
* @return A {@link SpillableQueue}.
*/
- <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde);
+ <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
index aad219d..1a3f550 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -19,6 +19,7 @@
package org.apache.apex.malhar.lib.state.spillable;
import java.util.List;
+import java.util.Set;
import javax.validation.constraints.NotNull;
@@ -27,9 +28,9 @@ import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
@@ -50,6 +51,11 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
@NotNull
private SpillableIdentifierGenerator identifierGenerator;
+ /**
+ * need to make sure all the buckets are created during setup.
+ */
+ protected transient Set<Long> bucketIds = Sets.newHashSet();
+
private SpillableComplexComponentImpl()
{
// for kryo
@@ -66,84 +72,99 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
}
- public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableList<T> newSpillableArrayList(long bucket, Serde<T> serde)
{
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
componentList.add(list);
return list;
}
- public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T> serde)
{
identifierGenerator.register(identifier);
SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+ bucketIds.add(bucket);
componentList.add(list);
return list;
}
- public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifierGenerator.next(),
bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableMap<K, V> newSpillableMap(byte[] identifier, long bucket, Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
SpillableMapImpl<K, V> map = new SpillableMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
+ @Override
public <K, V> SpillableListMultimap<K, V> newSpillableArrayListMultimap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> serdeKey,
+ Serde<V> serdeValue)
{
identifierGenerator.register(identifier);
SpillableArrayListMultimapImpl<K, V> map = new SpillableArrayListMultimapImpl<K, V>(store,
identifier, bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K,
- Slice> serdeKey, Serde<V, Slice> serdeValue)
+ @Override
+ public <K, V> SpillableSetMultimap<K, V> newSpillableSetMultimap(long bucket, Serde<K> serdeKey, Serde<V> serdeValue)
{
SpillableSetMultimapImpl<K, V> map = new SpillableSetMultimapImpl<K, V>(store,
identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ bucketIds.add(bucket);
componentList.add(map);
return map;
}
- public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableMultiset<T> newSpillableMultiset(long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableMultiset<T> newSpillableMultiset(byte[] identifier, long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
- public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ @Override
+ public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T> serde)
{
throw new UnsupportedOperationException("Unsupported Operation");
}
@@ -152,6 +173,15 @@ public class SpillableComplexComponentImpl implements SpillableComplexComponent
public void setup(Context.OperatorContext context)
{
store.setup(context);
+
+ //ensure buckets created.
+ for (long bucketId : bucketIds) {
+ store.ensureBucket(bucketId);
+ }
+
+ //the bucket ids are only for setup. We don't need bucket ids during run time.
+ bucketIds.clear();
+
for (SpillableComponent spillableComponent: componentList) {
spillableComponent.setup(context);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
index 016aeec..5fa39d7 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableMapImpl.java
@@ -26,13 +26,13 @@ import java.util.Set;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
@@ -51,21 +51,20 @@ import com.datatorrent.netlet.util.Slice;
public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spillable.SpillableComponent,
Serializable
{
+ private static final long serialVersionUID = 4552547110215784584L;
private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
- private transient MutableInt tempOffset = new MutableInt();
+ private transient Input tmpInput = new Input();
@NotNull
private SpillableStateStore store;
@NotNull
private byte[] identifier;
private long bucket;
- @NotNull
- private Serde<K, Slice> serdeKey;
- @NotNull
- private Serde<V, Slice> serdeValue;
private int size = 0;
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+
private SpillableMapImpl()
{
//for kryo
@@ -77,17 +76,16 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
* @param identifier The Id of this {@link SpillableMapImpl}.
* @param bucket The Id of the bucket used to store this
* {@link SpillableMapImpl} in the provided {@link SpillableStateStore}.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
- * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing keys.
+ * @param keySerde The {@link Serde} to use when serializing and deserializing values.
*/
- public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ public SpillableMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(null, identifier, Preconditions.checkNotNull(keySerde), Preconditions.checkNotNull(valueSerde));
}
public SpillableStateStore getStore()
@@ -134,16 +132,17 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
return val;
}
- Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+ Slice valSlice = store.getSync(bucket, keyValueSerdeManager.serializeDataKey(key, false));
if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
return null;
}
- tempOffset.setValue(0);
- return serdeValue.deserialize(valSlice, tempOffset);
+ tmpInput.setBuffer(valSlice.buffer, valSlice.offset, valSlice.length);
+ return keyValueSerdeManager.deserializeValue(tmpInput);
}
+
@Override
public V put(K k, V v)
{
@@ -207,6 +206,8 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
@Override
public void setup(Context.OperatorContext context)
{
+ store.ensureBucket(bucket);
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
@@ -218,16 +219,15 @@ public class SpillableMapImpl<K, V> implements Spillable.SpillableMap<K, V>, Spi
public void endWindow()
{
for (K key: cache.getChangedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- serdeValue.serialize(cache.get(key)));
+ store.put(bucket, keyValueSerdeManager.serializeDataKey(key, true),
+ keyValueSerdeManager.serializeValue(cache.get(key)));
}
for (K key: cache.getRemovedKeys()) {
- store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
- new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+ store.put(this.bucket, keyValueSerdeManager.serializeDataKey(key, true), BufferSlice.EMPTY_SLICE);
}
-
cache.endWindow();
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
index c2741b0..0dfc411 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetImpl.java
@@ -26,15 +26,15 @@ import java.util.NoSuchElementException;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.classification.InterfaceStability;
import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
import com.esotericsoftware.kryo.serializers.FieldSerializer;
import com.google.common.base.Preconditions;
import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
/**
* A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
@@ -62,49 +62,30 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
T next;
}
- public static class SerdeListNodeSlice<T> implements Serde<ListNode<T>, Slice>
+ public static class ListNodeSerde<T> implements Serde<ListNode<T>>
{
- private Serde<T, Slice> serde;
- private static Slice falseSlice = new Slice(new byte[]{0});
- private static Slice trueSlice = new Slice(new byte[]{1});
+ private Serde<T> serde;
- public SerdeListNodeSlice(@NotNull Serde<T, Slice> serde)
+ public ListNodeSerde(@NotNull Serde<T> serde)
{
this.serde = Preconditions.checkNotNull(serde);
}
@Override
- public Slice serialize(ListNode<T> object)
+ public void serialize(ListNode<T> object, Output output)
{
- int size = 0;
-
- Slice slice1 = object.valid ? trueSlice : falseSlice;
- size += 1;
- Slice slice2 = serde.serialize(object.next);
- size += slice2.length;
-
- byte[] bytes = new byte[size];
- System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
- System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
- return new Slice(bytes);
+ output.writeBoolean(object.valid);
+ serde.serialize(object.next, output);
}
@Override
- public ListNode<T> deserialize(Slice slice, MutableInt offset)
+ public ListNode<T> deserialize(Input input)
{
ListNode<T> result = new ListNode<>();
- result.valid = slice.buffer[offset.intValue()] != 0;
- offset.add(1);
- result.next = serde.deserialize(slice, offset);
+ result.valid = input.readBoolean();
+ result.next = serde.deserialize(input);
return result;
}
-
- @Override
- public ListNode<T> deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
}
@NotNull
@@ -135,11 +116,11 @@ public class SpillableSetImpl<T> implements Spillable.SpillableSet<T>, Spillable
*/
public SpillableSetImpl(long bucketId, @NotNull byte[] prefix,
@NotNull SpillableStateStore store,
- @NotNull Serde<T, Slice> serde)
+ @NotNull Serde<T> serde)
{
this.store = Preconditions.checkNotNull(store);
- map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new SerdeListNodeSlice(serde));
+ map = new SpillableMapImpl<>(store, prefix, bucketId, serde, new ListNodeSerde(serde));
}
public void setSize(int size)
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
index 98f60d2..76e47f2 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableSetMultimapImpl.java
@@ -27,11 +27,11 @@ import java.util.Set;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.utils.serde.AffixKeyValueSerdeManager;
+import org.apache.apex.malhar.lib.utils.serde.IntSerde;
+import org.apache.apex.malhar.lib.utils.serde.PairSerde;
import org.apache.apex.malhar.lib.utils.serde.PassThruSliceSerde;
import org.apache.apex.malhar.lib.utils.serde.Serde;
-import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
-import org.apache.apex.malhar.lib.utils.serde.SerdePairSlice;
-import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.classification.InterfaceStability;
@@ -65,10 +65,11 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
private SpillableStateStore store;
private byte[] identifier;
private long bucket;
- private Serde<K, Slice> serdeKey;
- private Serde<V, Slice> serdeValue;
+ private Serde<V> valueSerde;
private transient List<SpillableSetImpl<V>> removedSets = new ArrayList<>();
+ protected AffixKeyValueSerdeManager<K, V> keyValueSerdeManager;
+ protected transient Context.OperatorContext context;
private SpillableSetMultimapImpl()
{
// for kryo
@@ -84,16 +85,15 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
* @param serdeKey The {@link Serde} to use when serializing and deserializing values.
*/
public SpillableSetMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
+ Serde<K> keySerde,
+ Serde<V> valueSerde)
{
this.store = Preconditions.checkNotNull(store);
- this.identifier = Preconditions.checkNotNull(identifier);
this.bucket = bucket;
- this.serdeKey = Preconditions.checkNotNull(serdeKey);
- this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ this.valueSerde = Preconditions.checkNotNull(valueSerde);
+ keyValueSerdeManager = new AffixKeyValueSerdeManager<K, V>(META_KEY_SUFFIX, identifier, Preconditions.checkNotNull(keySerde), valueSerde);
- map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new SerdePairSlice<>(new SerdeIntSlice(), serdeValue));
+ map = new SpillableMapImpl(store, identifier, bucket, new PassThruSliceSerde(), new PairSerde<>(new IntSerde(), valueSerde));
}
public SpillableStateStore getStore()
@@ -112,17 +112,17 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
if (spillableSet == null) {
- Slice keySlice = serdeKey.serialize(key);
- Pair<Integer, V> meta = map.get(SliceUtils.concatenate(keySlice, META_KEY_SUFFIX));
+ Pair<Integer, V> meta = map.get(keyValueSerdeManager.serializeMetaKey(key, false));
if (meta == null) {
return null;
}
- Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
- spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ Slice keyPrefix = keyValueSerdeManager.serializeDataKey(key, false);
+ spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, valueSerde);
spillableSet.setSize(meta.getLeft());
spillableSet.setHead(meta.getRight());
+ spillableSet.setup(context);
}
cache.put(key, spillableSet);
@@ -166,7 +166,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper((K)key);
if (spillableSet != null) {
cache.remove((K)key);
- Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
map.put(keySlice, new ImmutablePair<>(0, spillableSet.getHead()));
spillableSet.clear();
removedSets.add(spillableSet);
@@ -199,7 +199,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
if (cache.contains((K)key)) {
return true;
}
- Slice keySlice = SliceUtils.concatenate(serdeKey.serialize((K)key), META_KEY_SUFFIX);
+ Slice keySlice = keyValueSerdeManager.serializeMetaKey((K)key, false);
Pair<Integer, V> meta = map.get(keySlice);
return meta != null && meta.getLeft() > 0;
}
@@ -227,8 +227,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = getHelper(key);
if (spillableSet == null) {
- Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
- spillableSet = new SpillableSetImpl<>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableSet = new SpillableSetImpl<V>(bucket, keyValueSerdeManager.serializeDataKey(key, true).toByteArray(), store, valueSerde);
+ spillableSet.setup(context);
cache.put(key, spillableSet);
}
return spillableSet.add(value);
@@ -284,13 +284,16 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
@Override
public void setup(Context.OperatorContext context)
{
+ this.context = context;
map.setup(context);
+ keyValueSerdeManager.setup(store, bucket);
}
@Override
public void beginWindow(long windowId)
{
map.beginWindow(windowId);
+ keyValueSerdeManager.beginWindow(windowId);
}
@Override
@@ -301,7 +304,7 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
SpillableSetImpl<V> spillableSet = cache.get(key);
spillableSet.endWindow();
- map.put(SliceUtils.concatenate(serdeKey.serialize(key), META_KEY_SUFFIX),
+ map.put(keyValueSerdeManager.serializeMetaKey(key, true),
new ImmutablePair<>(spillableSet.size(), spillableSet.getHead()));
}
@@ -311,6 +314,8 @@ public class SpillableSetMultimapImpl<K, V> implements Spillable.SpillableSetMul
cache.endWindow();
map.endWindow();
+
+ keyValueSerdeManager.resetReadBuffer();
}
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
index b6ee3c0..44f003b 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -19,6 +19,7 @@
package org.apache.apex.malhar.lib.state.spillable;
import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
import org.apache.hadoop.classification.InterfaceStability;
import com.datatorrent.api.Component;
@@ -32,6 +33,6 @@ import com.datatorrent.api.Operator;
*/
@InterfaceStability.Evolving
public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
- Operator.CheckpointNotificationListener, WindowListener
+ Operator.CheckpointNotificationListener, WindowListener, BucketProvider
{
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
index 0e1d55e..e80d38d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -21,6 +21,9 @@ package org.apache.apex.malhar.lib.state.spillable;
import java.util.Map;
import java.util.Set;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.base.Preconditions;
@@ -39,6 +42,7 @@ import com.google.common.collect.Sets;
@InterfaceStability.Evolving
public class WindowBoundedMapCache<K, V>
{
+ private static final transient Logger logger = LoggerFactory.getLogger(WindowBoundedMapCache.class);
public static final int DEFAULT_MAX_SIZE = 50000;
private int maxSize = DEFAULT_MAX_SIZE;
@@ -109,7 +113,6 @@ public class WindowBoundedMapCache<K, V>
Note: beginWindow is intentionally not implemented because many users need a cache that does not require
beginWindow to be called.
*/
-
public void endWindow()
{
int count = cache.size() - maxSize;
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
index 61ab8a8..8acb044 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -23,7 +23,10 @@ import java.util.concurrent.Future;
import javax.validation.constraints.NotNull;
+import org.apache.apex.malhar.lib.state.managed.Bucket;
import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.apex.malhar.lib.utils.serde.BufferSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.collect.Maps;
@@ -74,6 +77,8 @@ public class InMemSpillableStateStore implements SpillableStateStore
bucket = Maps.newHashMap();
store.put(bucketId, bucket);
}
+ key = SliceUtils.toBufferSlice(key);
+ value = SliceUtils.toBufferSlice(value);
bucket.put(key, value);
}
@@ -88,6 +93,10 @@ public class InMemSpillableStateStore implements SpillableStateStore
store.put(bucketId, bucket);
}
+ if (key.getClass() == Slice.class) {
+ //The hashCode of Slice was not correct, so correct it
+ key = new BufferSlice(key);
+ }
return bucket.get(key);
}
@@ -117,4 +126,21 @@ public class InMemSpillableStateStore implements SpillableStateStore
{
return store.toString();
}
+
+ protected Bucket.DefaultBucket bucket;
+
+ @Override
+ public Bucket getBucket(long bucketId)
+ {
+ return bucket;
+ }
+
+ @Override
+ public Bucket ensureBucket(long bucketId)
+ {
+ if (bucket == null) {
+ bucket = new Bucket.DefaultBucket(1);
+ }
+ return bucket;
+ }
}
[2/3] apex-malhar git commit: APEXMALHAR-2190 #resolve #comment Use
reusable buffer for serialization in spillable data structures
Posted by th...@apache.org.
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
new file mode 100644
index 0000000..57638d8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixKeyValueSerdeManager.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+
+import com.esotericsoftware.kryo.io.Input;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * All spillable data structures use this class to manage the buffers for serialization.
+ * This class contains serialization logic that is common for all spillable data structures
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class AffixKeyValueSerdeManager<K, V> extends KeyValueSerdeManager<K, V>
+{
+ /**
+ * The read buffer will be released when read is done, while write buffer should be held until the data has been persisted.
+ * The write buffer should be non-transient. The data which has been already saved to files will be removed by {@link Bucket}
+ * while the data which haven't been saved need to be recovered by the platform from checkpoint.
+ */
+ private AffixSerde<K> metaKeySerde;
+ private AffixSerde<K> dataKeySerde;
+
+
+ private AffixKeyValueSerdeManager()
+ {
+ //for kyro
+ }
+
+ public AffixKeyValueSerdeManager(byte[] metaKeySuffix, byte[] dataKeyIdentifier, Serde<K> keySerde, Serde<V> valueSerde)
+ {
+ this.valueSerde = valueSerde;
+ metaKeySerde = new AffixSerde<K>(null, keySerde, metaKeySuffix);
+ dataKeySerde = new AffixSerde<K>(dataKeyIdentifier, keySerde, null);
+ }
+
+ public Slice serializeMetaKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ metaKeySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+ public Slice serializeDataKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ dataKeySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+ public V deserializeValue(Input input)
+ {
+ V value = valueSerde.deserialize(input);
+ return value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
new file mode 100644
index 0000000..7504633
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/AffixSerde.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * AffixSerde provides serde for adding prefix or suffix
+ *
+ * @param <T>
+ */
+public class AffixSerde<T> implements Serde<T>
+{
+ private Serde<T> serde;
+ private byte[] prefix;
+ private byte[] suffix;
+
+ private AffixSerde()
+ {
+ //kyro
+ }
+
+ public AffixSerde(byte[] prefix, Serde<T> serde, byte[] suffix)
+ {
+ this.prefix = prefix;
+ this.suffix = suffix;
+ this.serde = serde;
+ }
+
+ @Override
+ public void serialize(T object, Output output)
+ {
+ if (prefix != null && prefix.length > 0) {
+ output.write(prefix);
+ }
+ serde.serialize(object, output);
+ if (suffix != null && suffix.length > 0) {
+ output.write(suffix);
+ }
+ }
+
+ @Override
+ public T deserialize(Input input)
+ {
+ if (prefix != null && prefix.length > 0) {
+ input.skip(prefix.length);
+ }
+ return serde.deserialize(input);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
new file mode 100644
index 0000000..4b2a45b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/ArraySerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.lang.reflect.Array;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+public class ArraySerde<T> implements Serde<T[]>
+{
+ private Serde<T> itemSerde;
+ private Class<T> itemType;
+
+ private ArraySerde()
+ {
+ }
+
+ /**
+ * Serializer and Deserializer need different constructor, so use static factory method to wrap.
+ * The ArraySerde returned by newSerializer can only used for serialization
+ */
+ public static <T> ArraySerde<T> newSerializer(Serde<T> itemSerde)
+ {
+ return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde));
+ }
+
+ public static <T> ArraySerde<T> newSerde(Serde<T> itemSerde, Class<T> itemType)
+ {
+ return new ArraySerde<T>(Preconditions.checkNotNull(itemSerde), Preconditions.checkNotNull(itemType));
+ }
+
+ private ArraySerde(Serde<T> itemSerde)
+ {
+ this.itemSerde = itemSerde;
+ }
+
+ private ArraySerde(Serde<T> itemSerde, Class<T> itemType)
+ {
+ this.itemSerde = itemSerde;
+ this.itemType = itemType;
+ }
+
+ @Override
+ public void serialize(T[] objects, Output output)
+ {
+ if (objects.length == 0) {
+ return;
+ }
+ output.writeInt(objects.length, true);
+ Serde<T> serializer = getItemSerde();
+ for (T object : objects) {
+ serializer.serialize(object, output);
+ }
+ }
+
+ protected Serde<T> getItemSerde()
+ {
+ return itemSerde;
+ }
+
+ @Override
+ public T[] deserialize(Input input)
+ {
+ int numOfElements = input.readInt(true);
+
+ T[] array = createObjectArray(numOfElements);
+
+ for (int index = 0; index < numOfElements; ++index) {
+ array[index] = getItemSerde().deserialize(input);
+ }
+ return array;
+ }
+
+ @SuppressWarnings("unchecked")
+ protected T[] createObjectArray(int length)
+ {
+ return (T[])Array.newInstance(itemType, length);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
new file mode 100644
index 0000000..c140962
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Block.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ *
+ * keep the information of one block
+ *
+ */
+public class Block
+{
+ public static class OutOfBlockBufferMemoryException extends RuntimeException
+ {
+ private static final long serialVersionUID = 3813792889200989131L;
+ }
+
+ private static final Logger logger = LoggerFactory.getLogger(Block.class);
+
+ public static final int DEFAULT_BLOCK_SIZE = 100000;
+
+ //the capacity of the block
+ private int capacity;
+
+ /*
+ * the size of the data.
+ */
+ private volatile int size;
+
+ private int objectBeginOffset = 0;
+ private byte[] buffer;
+
+ /**
+ * whether any slices have been exposed to the caller.
+ */
+ private boolean exposedSlices;
+
+ private Block()
+ {
+ this(DEFAULT_BLOCK_SIZE);
+ }
+
+ public Block(int capacity)
+ {
+ if (capacity <= 0) {
+ throw new IllegalArgumentException("Invalid capacity: " + capacity);
+ }
+ buffer = new byte[capacity];
+ this.capacity = capacity;
+ }
+
+ public void write(byte data)
+ {
+ checkOrReallocateBuffer(1);
+ buffer[size++] = data;
+ }
+
+ public void write(byte[] data)
+ {
+ write(data, 0, data.length);
+ }
+
+ public void write(byte[] data, final int offset, final int length)
+ {
+ checkOrReallocateBuffer(length);
+
+ System.arraycopy(data, offset, buffer, size, length);
+ size += length;
+ }
+
+
+
+ /**
+ * check the buffer size and reallocate if buffer is not enough
+ *
+ * @param length
+ */
+ private void checkOrReallocateBuffer(int length) throws OutOfBlockBufferMemoryException
+ {
+ if (size + length <= capacity) {
+ return;
+ }
+
+ if (exposedSlices) {
+ throw new OutOfBlockBufferMemoryException();
+ }
+
+ //calculate the new capacity
+ capacity = (size + length) * 2;
+
+ byte[] oldBuffer = buffer;
+ buffer = new byte[capacity];
+
+ /**
+ * no slices are exposed in this block yet (this is the first object in this block).
+ * so we can reallocate and move the memory
+ */
+ if (size > 0) {
+ System.arraycopy(oldBuffer, 0, buffer, 0, size);
+ }
+ }
+
+ /**
+ * Similar to toSlice, this method is used to get the information of the
+ * object regards the data already write to buffer. But unlike toSlice() which
+ * indicates all the writes of this object are already done, this method can be called at
+ * any time
+ */
+ public Slice getLastObjectSlice()
+ {
+ return new Slice(buffer, objectBeginOffset, size - objectBeginOffset);
+ }
+
+ public void discardLastObjectData()
+ {
+ if (objectBeginOffset == 0) {
+ return;
+ }
+ size = objectBeginOffset;
+ }
+
+ public void moveLastObjectDataTo(Block newBlock)
+ {
+ if (size > objectBeginOffset) {
+ newBlock.write(buffer, objectBeginOffset, size - objectBeginOffset);
+ discardLastObjectData();
+ }
+ }
+
+ /**
+ * This method returns the slice that represents the serialized form.
+ * The process of serializing an object should be one or multiple calls of write() followed by a toSlice() call.
+ * A call to toSlice indicates the writes are done for this object
+ *
+ * @return
+ */
+ public BufferSlice toSlice()
+ {
+ if (size == objectBeginOffset) {
+ throw new RuntimeException("data size is zero.");
+ }
+ BufferSlice slice = new BufferSlice(buffer, objectBeginOffset, size - objectBeginOffset);
+ //prepare for next object
+ objectBeginOffset = size;
+ exposedSlices = true;
+ return slice;
+ }
+
+ public void reset()
+ {
+ size = 0;
+ objectBeginOffset = 0;
+ exposedSlices = false;
+ }
+
+ /**
+ * check if the block has enough space for the length
+ *
+ * @param length
+ * @return
+ */
+ public boolean hasEnoughSpace(int length)
+ {
+ return size + length < capacity;
+ }
+
+ public long size()
+ {
+ return size;
+ }
+
+ public long capacity()
+ {
+ return capacity;
+ }
+
+ public boolean isFresh()
+ {
+ return (size == 0 && objectBeginOffset == 0 && exposedSlices == false);
+ }
+
+ /**
+ * Returns whether the block is clear. The block is clear when there has not been any write calls since the last toSlice() call.
+ *
+ * @return
+ */
+ public boolean isClear()
+ {
+ return objectBeginOffset == size;
+ }
+
+ public void release()
+ {
+ reset();
+ buffer = null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
new file mode 100644
index 0000000..f8a097e
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockReleaseStrategy.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * The process of interface would be:
+ * - Stream keep on reporting how many free blocks it has in certain frequent. usually at the end of each window
+ * - Stream check how many block should release. Stream usually release the blocks but Stream can make its own decision
+ * - Stream report how many blocks actually released
+ */
+public interface BlockReleaseStrategy
+{
+ /**
+ * The stream should call this method to report to the strategy how many blocks are free currently.
+ * @param freeBlockNum
+ */
+ void currentFreeBlocks(int freeBlockNum);
+
+ /**
+ * Get how many blocks can be released
+ * @return
+ */
+ int getNumBlocksToRelease();
+
+ /**
+ * The stream should call this method to report how many block are released.
+ * @param numReleasedBlocks
+ */
+ void releasedBlocks(int numReleasedBlocks);
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
new file mode 100644
index 0000000..ee50f7d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BlockStream.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.io.OutputStream;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A stream is a collection of blocks
+ * BlockStream avoids copying the data that are already exposed to the caller
+ *
+ */
+public class BlockStream extends OutputStream
+{
+ private static final Logger logger = LoggerFactory.getLogger(BlockStream.class);
+
+ //the initial capacity of each block
+ protected final int blockCapacity;
+
+ protected Map<Integer, Block> blocks = Maps.newHashMap();
+ //the index of current block, valid block index should >= 0
+ protected int currentBlockIndex = 0;
+ protected long size = 0;
+
+ protected Block currentBlock;
+
+ public BlockStream()
+ {
+ this(Block.DEFAULT_BLOCK_SIZE);
+ }
+
+ public BlockStream(int blockCapacity)
+ {
+ this.blockCapacity = blockCapacity;
+ }
+
+ @Override
+ public void write(byte[] data)
+ {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(int b)
+ {
+ currentBlock = getOrCreateCurrentBlock();
+ try {
+ currentBlock.write((byte)b);
+ } catch (Block.OutOfBlockBufferMemoryException e) {
+ reallocateBlock();
+ currentBlock.write((byte)b);
+ }
+ size++;
+ }
+
+ /**
+ * This write could be called multiple times for an object.
+ * The write method makes sure the same object only write to one block
+ *
+ * @param data
+ * @param offset
+ * @param length
+ */
+ @Override
+ public void write(byte[] data, final int offset, final int length)
+ {
+ //start with a block which at least can hold this data
+ currentBlock = getOrCreateCurrentBlock();
+ try {
+ currentBlock.write(data, offset, length);
+ } catch (Block.OutOfBlockBufferMemoryException e) {
+ reallocateBlock();
+ currentBlock.write(data, offset, length);
+ }
+ size += length;
+ }
+
+ private void reallocateBlock()
+ {
+ //use next block
+ Block previousBlock = moveToNextBlock();
+ if (!currentBlock.isFresh()) {
+ throw new RuntimeException("New block is not fresh.");
+ }
+ if (!previousBlock.isClear()) {
+ previousBlock.moveLastObjectDataTo(currentBlock);
+ }
+ }
+
+ /**
+ *
+ * @return The previous block
+ */
+ protected Block moveToNextBlock()
+ {
+ Block previousBlock = currentBlock;
+
+ ++currentBlockIndex;
+ currentBlock = getOrCreateCurrentBlock();
+ if (!currentBlock.isFresh()) {
+ throw new RuntimeException("Assigned non fresh block.");
+ }
+ return previousBlock;
+ }
+
+ protected Block getOrCreateCurrentBlock()
+ {
+ Block block = blocks.get(currentBlockIndex);
+ if (block == null) {
+ block = new Block(blockCapacity);
+ blocks.put(currentBlockIndex, block);
+ }
+ return block;
+ }
+
+ public long size()
+ {
+ return size;
+ }
+
+ public long capacity()
+ {
+ long capacity = 0;
+ for (Block block : blocks.values()) {
+ capacity += block.capacity();
+ }
+ return capacity;
+ }
+
+ /**
+ *
+ * this is the call that represents the end of an object
+ */
+ public Slice toSlice()
+ {
+ return blocks.get(currentBlockIndex).toSlice();
+ }
+
+ /**
+ * resets all blocks
+ */
+ public void reset()
+ {
+ currentBlockIndex = 0;
+ size = 0;
+ for (Block block : blocks.values()) {
+ block.reset();
+ }
+ }
+
+ public void release()
+ {
+ reset();
+ blocks.clear();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
new file mode 100644
index 0000000..5d830fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/BufferSlice.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.getopt.util.hash.MurmurHash;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * com.datatorrent.netlet.util.Slice has problem with the hashCode(), so
+ * override here
+ *
+ */
+public class BufferSlice extends Slice
+{
+ private static final long serialVersionUID = -471209532589983329L;
+ public static final BufferSlice EMPTY_SLICE = new BufferSlice(ArrayUtils.EMPTY_BYTE_ARRAY);
+
+ //for kyro
+ private BufferSlice()
+ {
+ //the super class's default constructor is private and can't called.
+ super(null, 0, 0);
+ }
+
+ public BufferSlice(byte[] array, int offset, int length)
+ {
+ super(array, offset, length);
+ }
+
+ public BufferSlice(byte[] array)
+ {
+ super(array);
+ }
+
+ public BufferSlice(Slice netletSlice)
+ {
+ this(netletSlice.buffer, netletSlice.offset, netletSlice.length);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ int hash = 5;
+ hash = 59 * hash + MurmurHash.hash(buffer, hash, offset, length);
+ hash = 59 * hash + this.length;
+ return hash;
+ }
+
+ /**
+ * let this class equals with com.datatorrent.netlet.util.Slice
+ */
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (obj == null) {
+ return false;
+ }
+ if (!Slice.class.isAssignableFrom(obj.getClass())) {
+ return false;
+ }
+ final Slice other = (Slice)obj;
+ if (this.length != other.length) {
+ return false;
+ }
+
+ final int offset1 = this.offset;
+ final byte[] buffer1 = this.buffer;
+ int i = offset1 + this.length;
+
+ final byte[] buffer2 = other.buffer;
+ int j = other.offset + other.length;
+
+ while (i-- > offset1) {
+ if (buffer1[i] != buffer2[--j]) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
new file mode 100644
index 0000000..bcd0b74
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/CollectionSerde.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Collection;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes lists.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class CollectionSerde<T, CollectionT extends Collection<T>> implements Serde<CollectionT>
+{
+ @NotNull
+ private Serde<T> serde;
+
+ @NotNull
+ private Class<? extends CollectionT> collectionClass;
+
+ private CollectionSerde()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link CollectionSerde}.
+ * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
+ */
+ public CollectionSerde(@NotNull Serde<T> serde, @NotNull Class<? extends CollectionT> collectionClass /*Class<? extends C1> collectionClass*/ )
+ {
+ this.serde = Preconditions.checkNotNull(serde);
+ this.collectionClass = Preconditions.checkNotNull(collectionClass);
+ }
+
+ @Override
+ public void serialize(CollectionT objects, Output output)
+ {
+ if (objects.size() == 0) {
+ return;
+ }
+ output.writeInt(objects.size(), true);
+ Serde<T> serializer = getItemSerde();
+ for (T object : objects) {
+ serializer.serialize(object, output);
+ }
+ }
+
+ @Override
+ public CollectionT deserialize(Input input)
+ {
+ int numElements = input.readInt(true);
+
+ try {
+ CollectionT collection = collectionClass.newInstance();
+
+ for (int index = 0; index < numElements; index++) {
+ T object = serde.deserialize(input);
+ collection.add(object);
+ }
+
+ return collection;
+ } catch (Exception ex) {
+ throw Throwables.propagate(ex);
+ }
+ }
+
+ protected Serde<T> getItemSerde()
+ {
+ return serde;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
new file mode 100644
index 0000000..93929e4
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/DefaultBlockReleaseStrategy.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import java.util.Arrays;
+
+import org.apache.commons.collections.buffer.CircularFifoBuffer;
+
+/**
+ * This implementation get the minimum number of free blocks in the period to release.
+ *
+ */
+public class DefaultBlockReleaseStrategy implements BlockReleaseStrategy
+{
+ public static final int DEFAULT_PERIOD = 60; // 60 reports
+ private CircularFifoBuffer freeBlockNumQueue;
+ private Integer[] tmpArray;
+
+ public DefaultBlockReleaseStrategy()
+ {
+ this(DEFAULT_PERIOD);
+ }
+
+ public DefaultBlockReleaseStrategy(int period)
+ {
+ freeBlockNumQueue = new CircularFifoBuffer(period);
+ tmpArray = new Integer[period];
+ Arrays.fill(tmpArray, 0);
+ }
+
+ /**
+ * The stream calls this to report to the strategy how many blocks are free currently.
+ * @param freeBlockNum
+ */
+ @Override
+ public void currentFreeBlocks(int freeBlockNum)
+ {
+ if (freeBlockNum < 0) {
+ throw new IllegalArgumentException("The number of free blocks could not less than zero.");
+ }
+ freeBlockNumQueue.add(freeBlockNum);
+ }
+
+ /**
+ * Get how many blocks that can be released
+ * @return
+ */
+ @Override
+ public int getNumBlocksToRelease()
+ {
+ int minNum = Integer.MAX_VALUE;
+ for (Object num : freeBlockNumQueue) {
+ minNum = Math.min((Integer)num, minNum);
+ }
+ return minNum;
+ }
+
+
+ /**
+ * report how many blocks that have been released.
+ * @param numReleasedBlocks
+ */
+ @Override
+ public void releasedBlocks(int numReleasedBlocks)
+ {
+ if (numReleasedBlocks == 0) {
+ return;
+ }
+ if (numReleasedBlocks < 0) {
+ throw new IllegalArgumentException("Num of released blocks should not be negative");
+ }
+ /**
+ * decrease by released blocks
+ */
+ for (Object num : freeBlockNumQueue) {
+ freeBlockNumQueue.add(Math.max((Integer)num - numReleasedBlocks, 0));
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
new file mode 100644
index 0000000..0fbb2ab
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/GenericSerde.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
+ * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
+ * incompatible changes to the class being serialized.
+ *
+ * @param <T> The type being serialized
+ */
+@InterfaceStability.Evolving
+public class GenericSerde<T> implements Serde<T>
+{
+ private transient ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
+ {
+ @Override
+ public Kryo get()
+ {
+ return new Kryo();
+ }
+ };
+
+ private final Class<? extends T> clazz;
+
+ public GenericSerde()
+ {
+ this.clazz = null;
+ }
+
+ public GenericSerde(Class<? extends T> clazz)
+ {
+ this.clazz = clazz;
+ }
+
+ @Override
+ public void serialize(T object, Output output)
+ {
+ Kryo kryo = kryos.get();
+ if (clazz == null) {
+ kryo.writeClassAndObject(output, object);
+ } else {
+ kryo.writeObject(output, object);
+ }
+ }
+
+ @Override
+ public T deserialize(Input input)
+ {
+ T object;
+ Kryo kryo = kryos.get();
+ if (clazz == null) {
+ object = (T)kryo.readClassAndObject(input);
+ } else {
+ object = kryo.readObject(input, clazz);
+ }
+ return object;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
new file mode 100644
index 0000000..032b5e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/IntSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class IntSerde implements Serde<Integer>
+{
+ @Override
+ public void serialize(Integer value, Output output)
+ {
+ output.writeInt(value);
+ }
+
+ @Override
+ public Integer deserialize(Input input)
+ {
+ return input.readInt();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
new file mode 100644
index 0000000..a7dfa7f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueByteStreamProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+/**
+ * This interface provides methods for stream for key/value.
+ * The implementation can separate the stream for key and value or share the same one.
+ *
+ */
+public interface KeyValueByteStreamProvider
+{
+ /**
+ * @return The stream for keeping key
+ */
+ WindowedBlockStream getKeyStream();
+
+ /**
+ * @return The stream for keeping value
+ */
+ WindowedBlockStream getValueStream();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
new file mode 100644
index 0000000..6fbe9fe
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/KeyValueSerdeManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.managed.Bucket;
+import org.apache.apex.malhar.lib.state.managed.BucketProvider;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class KeyValueSerdeManager<K, V>
+{
+ protected Serde<K> keySerde;
+ protected Serde<V> valueSerde;
+
+ protected SerializationBuffer keyBufferForWrite;
+ protected transient SerializationBuffer keyBufferForRead = SerializationBuffer.READ_BUFFER;
+
+ protected SerializationBuffer valueBuffer;
+
+
+ protected KeyValueSerdeManager()
+ {
+ //for kyro
+ }
+
+ public KeyValueSerdeManager(Serde<K> keySerde, Serde<V> valueSerde)
+ {
+ this.keySerde = keySerde;
+ this.valueSerde = valueSerde;
+ }
+
+ public void setup(BucketProvider bp, long bucketId)
+ {
+ //the bucket will not change for this class. so get streams from setup, else, need to set stream before serialize
+ Bucket bucketInst = bp.ensureBucket(bucketId);
+ this.valueBuffer = new SerializationBuffer(bucketInst.getValueStream());
+
+ keyBufferForWrite = new SerializationBuffer(bucketInst.getKeyStream());
+ }
+
+ public Slice serializeKey(K key, boolean write)
+ {
+ SerializationBuffer buffer = write ? keyBufferForWrite : keyBufferForRead;
+ keySerde.serialize(key, buffer);
+ return buffer.toSlice();
+ }
+
+
+ /**
+ * Value only serialize for writing
+ * @param value
+ * @return
+ */
+ public Slice serializeValue(V value)
+ {
+ valueSerde.serialize(value, valueBuffer);
+ return valueBuffer.toSlice();
+ }
+
+ public void beginWindow(long windowId)
+ {
+ keyBufferForWrite.beginWindow(windowId);
+ valueBuffer.beginWindow(windowId);
+ }
+
+ public void resetReadBuffer()
+ {
+ keyBufferForRead.release();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
new file mode 100644
index 0000000..0b63737
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/LongSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class LongSerde implements Serde<Long>
+{
+ @Override
+ public void serialize(Long value, Output output)
+ {
+ output.writeLong(value);
+ }
+
+ @Override
+ public Long deserialize(Input input)
+ {
+ return input.readLong();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
new file mode 100644
index 0000000..3190880
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PairSerde.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Preconditions;
+
+/**
+ * This is an implementation of {@link Serde} which serializes and deserializes pairs.
+ */
+@InterfaceStability.Evolving
+public class PairSerde<T1, T2> implements Serde<Pair<T1, T2>>
+{
+ @NotNull
+ private Serde<T1> serde1;
+ @NotNull
+ private Serde<T2> serde2;
+
+ private PairSerde()
+ {
+ // for Kryo
+ }
+
+ /**
+ * Creates a {@link PairSerde}.
+ * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
+ * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
+ */
+ public PairSerde(@NotNull Serde<T1> serde1, @NotNull Serde<T2> serde2)
+ {
+ this.serde1 = Preconditions.checkNotNull(serde1);
+ this.serde2 = Preconditions.checkNotNull(serde2);
+ }
+
+ @Override
+ public void serialize(Pair<T1, T2> pair, Output output)
+ {
+ serde1.serialize(pair.getLeft(), output);
+ serde2.serialize(pair.getRight(), output);
+ }
+
+ @Override
+ public Pair<T1, T2> deserialize(Input input)
+ {
+ T1 first = serde1.deserialize(input);
+ T2 second = serde2.deserialize(input);
+ return new ImmutablePair<>(first, second);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
deleted file mode 100644
index 9669981..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
- * Similarly when deserialization is performed the input byte array is returned.
- *
- * @since 3.4.0
- */
-@InterfaceStability.Evolving
-public class PassThruByteArraySerde implements Serde<byte[], byte[]>
-{
- @Override
- public byte[] serialize(byte[] object)
- {
- return object;
- }
-
- @Override
- public byte[] deserialize(byte[] object, MutableInt offset)
- {
- offset.add(object.length);
- return object;
- }
-
- @Override
- public byte[] deserialize(byte[] object)
- {
- return object;
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
deleted file mode 100644
index b22bf6f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
- * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
- * out of the {@link Slice} object.
- *
- * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
- *
- * @since 3.5.0
- */
-public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
-{
- @Override
- public Slice serialize(byte[] object)
- {
- return new Slice(object);
- }
-
- @Override
- public byte[] deserialize(Slice object, MutableInt offset)
- {
- offset.add(object.length);
-
- if (object.offset == 0) {
- return object.buffer;
- }
-
- byte[] bytes = new byte[object.length];
- System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
- return bytes;
- }
-
- @Override
- public byte[] deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
index 2646c0e..679e116 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -18,9 +18,14 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
-import org.apache.commons.lang3.mutable.MutableInt;
+import java.io.IOException;
+
import org.apache.hadoop.classification.InterfaceStability;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+import com.google.common.base.Throwables;
+
import com.datatorrent.netlet.util.Slice;
/**
@@ -30,23 +35,26 @@ import com.datatorrent.netlet.util.Slice;
* @since 3.5.0
*/
@InterfaceStability.Evolving
-public class PassThruSliceSerde implements Serde<Slice, Slice>
+public class PassThruSliceSerde implements Serde<Slice>
{
@Override
- public Slice serialize(Slice object)
- {
- return object;
- }
-
- @Override
- public Slice deserialize(Slice object, MutableInt offset)
+ public void serialize(Slice slice, Output output)
{
- return object;
+ output.write(slice.buffer, slice.offset, slice.length);
}
@Override
- public Slice deserialize(Slice object)
+ public Slice deserialize(Input input)
{
- return object;
+ if (input.getInputStream() != null) {
+ // The input is backed by a stream, cannot directly use its internal buffer
+ try {
+ return new Slice(input.readBytes(input.available()));
+ } catch (IOException ex) {
+ throw Throwables.propagate(ex);
+ }
+ } else {
+ return new Slice(input.getBuffer(), input.position(), input.limit() - input.position());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
index 6e02aee..d09612d 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/Serde.java
@@ -18,46 +18,29 @@
*/
package org.apache.apex.malhar.lib.utils.serde;
-import org.apache.commons.lang3.mutable.MutableInt;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
/**
* This is an interface for a Serializer/Deserializer class.
- * @param <OBJ> The type of the object to Serialize and Deserialize.
- * @param <SER> The type to Serialize an Object to.
+ * @param <T> The type of the object to Serialize and Deserialize.
*
* @since 3.4.0
*/
-public interface Serde<OBJ, SER>
+public interface Serde<T>
{
/**
- * Serialized the given object.
- * @param object The object to serialize.
- * @return The serialized representation of the object.
+ * Serialize the object to the given output.
+ * @param object
+ * @param output
*/
- SER serialize(OBJ object);
+ void serialize(T object, Output output);
/**
- * Deserializes the given serialized representation of an object.
- * @param object The serialized representation of an object.
- * @param offset An offset in the serialized representation of the object. After the
- * deserialize method completes the offset is updated, so that the offset points to
- * the remaining unprocessed portion of the serialized object. For example:<br/>
- * {@code
- * Object obj;
- * MutableInt mi;
- * someObj1 = deserialize(obj, mi);
- * someObj2 = deserialize(obj, mi);
- * }
+ * Deserialize from the input and return a new object.
*
- * @return The deserialized object.
+ * @param input
+ * @return
*/
- OBJ deserialize(SER object, MutableInt offset);
-
- /**
- * Deserializes the given serialized representation of an object.
- * @param object The serialized representation of an object.
- *
- * @return The deserialized object.
- */
- OBJ deserialize(SER object);
+ T deserialize(Input input);
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
deleted file mode 100644
index eca1d5f..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeCollectionSlice.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes lists.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeCollectionSlice<T, CollectionT extends Collection<T>> implements Serde<CollectionT, Slice>
-{
- @NotNull
- private Serde<T, Slice> serde;
-
- @NotNull
- private Class<? extends CollectionT> collectionClass;
-
- private SerdeCollectionSlice()
- {
- // for Kryo
- }
-
- /**
- * Creates a {@link SerdeCollectionSlice}.
- * @param serde The {@link Serde} that is used to serialize and deserialize each element of a list.
- */
- public SerdeCollectionSlice(@NotNull Serde<T, Slice> serde, @NotNull Class<? extends CollectionT> collectionClass)
- {
- this.serde = Preconditions.checkNotNull(serde);
- this.collectionClass = Preconditions.checkNotNull(collectionClass);
- }
-
- @Override
- public Slice serialize(CollectionT objects)
- {
- Slice[] slices = new Slice[objects.size()];
-
- int size = 4;
-
- int index = 0;
- for (T object : objects) {
- Slice slice = serde.serialize(object);
- slices[index++] = slice;
- size += slice.length;
- }
-
- byte[] bytes = new byte[size];
- int offset = 0;
-
- byte[] sizeBytes = GPOUtils.serializeInt(objects.size());
- System.arraycopy(sizeBytes, 0, bytes, offset, 4);
- offset += 4;
-
- for (index = 0; index < slices.length; index++) {
- Slice slice = slices[index];
- System.arraycopy(slice.buffer, slice.offset, bytes, offset, slice.length);
- offset += slice.length;
- }
-
- return new Slice(bytes);
- }
-
- @Override
- public CollectionT deserialize(Slice slice, MutableInt offset)
- {
- MutableInt sliceOffset = new MutableInt(slice.offset + offset.intValue());
-
- int numElements = GPOUtils.deserializeInt(slice.buffer, sliceOffset);
- sliceOffset.subtract(slice.offset);
- try {
- CollectionT collection = collectionClass.newInstance();
-
- for (int index = 0; index < numElements; index++) {
- T object = serde.deserialize(slice, sliceOffset);
- collection.add(object);
- }
-
- offset.setValue(sliceOffset.intValue());
- return collection;
- } catch (Exception ex) {
- throw Throwables.propagate(ex);
- }
- }
-
- @Override
- public CollectionT deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
deleted file mode 100644
index 3275a93..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeIntSlice implements Serde<Integer, Slice>
-{
- @Override
- public Slice serialize(Integer object)
- {
- return new Slice(GPOUtils.serializeInt(object));
- }
-
- @Override
- public Integer deserialize(Slice slice, MutableInt offset)
- {
- int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
- offset.add(4);
- return val;
- }
-
- @Override
- public Integer deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
deleted file mode 100644
index d4b9488..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeKryoSlice.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import java.io.ByteArrayOutputStream;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * Generic serde using Kryo serialization. Note that while this is convenient, it may not be desirable because
- * using Kryo makes the object being serialized rigid, meaning you won't be able to make backward compatible or
- * incompatible changes to the class being serialized.
- *
- * @param <T> The type being serialized
- */
-@InterfaceStability.Evolving
-public class SerdeKryoSlice<T> implements Serde<T, Slice>
-{
- // Setup ThreadLocal of Kryo instances
- private static final ThreadLocal<Kryo> kryos = new ThreadLocal<Kryo>()
- {
- protected Kryo initialValue()
- {
- Kryo kryo = new Kryo();
- // configure kryo instance, customize settings
- return kryo;
- }
- };
-
- private final Class<? extends T> clazz;
-
- public SerdeKryoSlice()
- {
- this.clazz = null;
- }
-
- public SerdeKryoSlice(Class<? extends T> clazz)
- {
- this.clazz = clazz;
- }
-
- @Override
- public Slice serialize(T object)
- {
- Kryo kryo = kryos.get();
- ByteArrayOutputStream stream = new ByteArrayOutputStream();
- Output output = new Output(stream);
- if (clazz == null) {
- kryo.writeClassAndObject(output, object);
- } else {
- kryo.writeObject(output, object);
- }
- return new Slice(output.toBytes());
- }
-
- @Override
- public T deserialize(Slice slice, MutableInt offset)
- {
- byte[] bytes = slice.toByteArray();
- Kryo kryo = kryos.get();
- Input input = new Input(bytes, offset.intValue(), bytes.length - offset.intValue());
- T object;
- if (clazz == null) {
- object = (T)kryo.readClassAndObject(input);
- } else {
- object = kryo.readObject(input, clazz);
- }
- offset.setValue(bytes.length - input.position());
- return object;
- }
-
- @Override
- public T deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
deleted file mode 100644
index 6fe07d9..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeLongSlice.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which deserializes and serializes integers.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeLongSlice implements Serde<Long, Slice>
-{
- @Override
- public Slice serialize(Long object)
- {
- return new Slice(GPOUtils.serializeLong(object));
- }
-
- @Override
- public Long deserialize(Slice slice, MutableInt offset)
- {
- long val = GPOUtils.deserializeLong(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
- offset.add(8);
- return val;
- }
-
- @Override
- public Long deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
deleted file mode 100644
index 59cf282..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdePairSlice.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import javax.validation.constraints.NotNull;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.google.common.base.Preconditions;
-
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * This is an implementation of {@link Serde} which serializes and deserializes pairs.
- */
-@InterfaceStability.Evolving
-public class SerdePairSlice<T1, T2> implements Serde<Pair<T1, T2>, Slice>
-{
- @NotNull
- private Serde<T1, Slice> serde1;
- @NotNull
- private Serde<T2, Slice> serde2;
-
- private SerdePairSlice()
- {
- // for Kryo
- }
-
- /**
- * Creates a {@link SerdePairSlice}.
- * @param serde1 The {@link Serde} that is used to serialize and deserialize first element of a pair
- * @param serde2 The {@link Serde} that is used to serialize and deserialize second element of a pair
- */
- public SerdePairSlice(@NotNull Serde<T1, Slice> serde1, @NotNull Serde<T2, Slice> serde2)
- {
- this.serde1 = Preconditions.checkNotNull(serde1);
- this.serde2 = Preconditions.checkNotNull(serde2);
- }
-
- @Override
- public Slice serialize(Pair<T1, T2> pair)
- {
- int size = 0;
-
- Slice slice1 = serde1.serialize(pair.getLeft());
- size += slice1.length;
- Slice slice2 = serde2.serialize(pair.getRight());
- size += slice2.length;
-
- byte[] bytes = new byte[size];
- System.arraycopy(slice1.buffer, slice1.offset, bytes, 0, slice1.length);
- System.arraycopy(slice2.buffer, slice2.offset, bytes, slice1.length, slice2.length);
-
- return new Slice(bytes);
- }
-
- @Override
- public Pair<T1, T2> deserialize(Slice slice, MutableInt offset)
- {
- T1 first = serde1.deserialize(slice, offset);
- T2 second = serde2.deserialize(slice, offset);
- return new ImmutablePair<>(first, second);
- }
-
- @Override
- public Pair<T1, T2> deserialize(Slice slice)
- {
- return deserialize(slice, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
deleted file mode 100644
index aaf0d61..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeStringSlice.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.utils.serde;
-
-import org.apache.commons.lang3.mutable.MutableInt;
-import org.apache.hadoop.classification.InterfaceStability;
-
-import com.datatorrent.lib.appdata.gpo.GPOUtils;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
- *
- * @since 3.5.0
- */
-@InterfaceStability.Evolving
-public class SerdeStringSlice implements Serde<String, Slice>
-{
- @Override
- public Slice serialize(String object)
- {
- return new Slice(GPOUtils.serializeString(object));
- }
-
- @Override
- public String deserialize(Slice object, MutableInt offset)
- {
- offset.add(object.offset);
- String string = GPOUtils.deserializeString(object.buffer, offset);
- offset.subtract(object.offset);
- return string;
- }
-
- @Override
- public String deserialize(Slice object)
- {
- return deserialize(object, new MutableInt(0));
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
new file mode 100644
index 0000000..f33f1e0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerializationBuffer.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.apex.malhar.lib.state.spillable.WindowListener;
+
+import com.esotericsoftware.kryo.io.Output;
+
+import com.datatorrent.netlet.util.Slice;
+
+public class SerializationBuffer extends Output implements WindowCompleteListener, WindowListener
+{
+ /*
+ * Singleton read buffer for serialization
+ */
+ public static final SerializationBuffer READ_BUFFER = new SerializationBuffer(new WindowedBlockStream());
+
+ private WindowedBlockStream windowedBlockStream;
+
+ @SuppressWarnings("unused")
+ private SerializationBuffer()
+ {
+ this(new WindowedBlockStream());
+ }
+
+ public SerializationBuffer(WindowedBlockStream windowedBlockStream)
+ {
+ super(windowedBlockStream);
+ this.windowedBlockStream = windowedBlockStream;
+ }
+
+ public long size()
+ {
+ return windowedBlockStream.size();
+ }
+
+ public long capacity()
+ {
+ return windowedBlockStream.capacity();
+ }
+
+ /**
+ * This method should be called only after the whole object has been written
+ * @return The slice which represents the object
+ */
+ public Slice toSlice()
+ {
+ this.flush();
+ return windowedBlockStream.toSlice();
+ }
+
+ /**
+ * reset the environment to reuse the resource.
+ */
+ public void reset()
+ {
+ windowedBlockStream.reset();
+ }
+
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ windowedBlockStream.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ windowedBlockStream.endWindow();
+ }
+
+ public void release()
+ {
+ reset();
+ windowedBlockStream.reset();
+ }
+
+ public WindowedBlockStream createWindowedBlockStream()
+ {
+ return new WindowedBlockStream();
+ }
+
+ public WindowedBlockStream createWindowedBlockStream(int capacity)
+ {
+ return new WindowedBlockStream(capacity);
+ }
+
+ public WindowedBlockStream getWindowedBlockStream()
+ {
+ return windowedBlockStream;
+ }
+
+ public void setWindowableByteStream(WindowedBlockStream windowableByteStream)
+ {
+ this.windowedBlockStream = windowableByteStream;
+ }
+
+ /**
+ * reset for all windows with window id less than or equal to the input windowId
+ * this interface doesn't call reset window for each windows. Several windows can be reset at the same time.
+ * @param windowId
+ */
+ @Override
+ public void completeWindow(long windowId)
+ {
+ windowedBlockStream.completeWindow(windowId);
+ }
+
+ public byte[] toByteArray()
+ {
+ return toSlice().toByteArray();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
index 2671d5e..b504581 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SliceUtils.java
@@ -100,4 +100,14 @@ public class SliceUtils
return new Slice(bytes);
}
+
+ public static BufferSlice toBufferSlice(Slice slice)
+ {
+ if (slice instanceof BufferSlice) {
+ return (BufferSlice)slice;
+ }
+
+ //The hashCode of Slice was not correct, so correct it
+ return new BufferSlice(slice);
+ }
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
new file mode 100644
index 0000000..cb45e2a
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/StringSerde.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * An implementation of {@link Serde} which serializes and deserializes {@link String}s.
+ *
+ * @since 3.5.0
+ */
+@InterfaceStability.Evolving
+public class StringSerde implements Serde<String>
+{
+ @Override
+ public void serialize(String string, Output output)
+ {
+ output.writeString(string);
+ }
+
+ @Override
+ public String deserialize(Input input)
+ {
+ return input.readString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2fa1e6b1/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
new file mode 100644
index 0000000..d2d38a7
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/WindowCompleteListener.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+public interface WindowCompleteListener
+{
+ /**
+ * Notification that all windows which window id less or equal input windowId are complete
+ *
+ * @param windowId
+ */
+ void completeWindow(long windowId);
+}