You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by da...@apache.org on 2016/08/15 19:55:03 UTC
[3/3] apex-malhar git commit: APEXMALHAR-2048 Added implementations
of SpillableList, SpillableMap, and SpillableArrayListMultimap
APEXMALHAR-2048 Added implementations of SpillableList, SpillableMap, and SpillableArrayListMultimap
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/9b6e11d8
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/9b6e11d8
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/9b6e11d8
Branch: refs/heads/master
Commit: 9b6e11d85accc88faa08d7b4a8daeb9b069fc878
Parents: d1fb2b6
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Sun Jul 17 14:32:34 2016 -0700
Committer: David Yan <da...@datatorrent.com>
Committed: Mon Aug 15 12:15:21 2016 -0700
----------------------------------------------------------------------
.../SequentialSpillableIdentifierGenerator.java | 84 +++
.../malhar/lib/state/spillable/Spillable.java | 14 +-
.../state/spillable/SpillableArrayListImpl.java | 326 ++++++++++
.../SpillableByteArrayListMultimapImpl.java | 291 +++++++++
.../state/spillable/SpillableByteMapImpl.java | 235 ++++++++
.../spillable/SpillableComplexComponent.java | 4 +-
.../SpillableComplexComponentImpl.java | 193 ++++++
.../spillable/SpillableIdentifierGenerator.java | 41 ++
.../state/spillable/SpillableStateStore.java | 35 ++
.../state/spillable/TimeBasedPriorityQueue.java | 154 +++++
.../state/spillable/WindowBoundedMapCache.java | 129 ++++
.../lib/state/spillable/WindowListener.java | 42 ++
.../state/spillable/inmem/InMemMultiset.java | 161 -----
.../inmem/InMemSpillableArrayList.java | 175 ------
.../InMemSpillableByteArrayListMultimap.java | 154 -----
.../inmem/InMemSpillableComplexComponent.java | 117 ----
.../inmem/InMemSpillableStateStore.java | 118 ++++
.../ManagedStateSpillableStateStore.java | 34 ++
.../lib/utils/serde/PassThruByteArraySerde.java | 2 +
.../serde/PassThruByteArraySliceSerde.java | 59 ++
.../lib/utils/serde/PassThruSliceSerde.java | 50 ++
.../malhar/lib/utils/serde/SerdeIntSlice.java | 52 ++
.../malhar/lib/utils/serde/SerdeListSlice.java | 109 ++++
.../lib/utils/serde/SerdeStringSlice.java | 53 ++
.../apex/malhar/lib/utils/serde/SliceUtils.java | 101 ++++
.../com/datatorrent/lib/util/TestUtils.java | 25 +
.../state/managed/ManagedStateTestUtils.java | 13 +-
.../spillable/inmem/InMemMultisetTest.java | 44 --
.../inmem/InMemSpillableArrayListTest.java | 44 --
...InMemSpillableByteArrayListMultimapTest.java | 45 --
...uentialSpillableIdentifierGeneratorTest.java | 125 ++++
.../spillable/SpillableArrayListImplTest.java | 594 +++++++++++++++++++
.../SpillableByteArrayListMultimapImplTest.java | 341 +++++++++++
.../spillable/SpillableByteMapImplTest.java | 484 +++++++++++++++
.../SpillableComplexComponentImplTest.java | 63 ++
.../lib/state/spillable/SpillableTestUtils.java | 134 +++++
.../spillable/TimeBasedPriorityQueueTest.java | 134 +++++
.../spillable/WindowBoundedMapCacheTest.java | 116 ++++
.../inmem/InMemorySpillableStateStoreTest.java | 60 ++
.../lib/utils/serde/SerdeListSliceTest.java | 45 ++
40 files changed, 4240 insertions(+), 760 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
new file mode 100644
index 0000000..600fa98
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SequentialSpillableIdentifierGenerator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+
+/**
+ * This is an id generator that generates single byte ids for Spillable datastructures.
+ */
+@InterfaceStability.Evolving
+public class SequentialSpillableIdentifierGenerator implements SpillableIdentifierGenerator
+{
+ private boolean nextCalled = false;
+ private boolean done = false;
+ private byte currentIdentifier = 0;
+
+ private Set<Byte> registeredIdentifier = Sets.newHashSet();
+
+ @Override
+ public byte[] next()
+ {
+ Preconditions.checkState(!done);
+
+ nextCalled = true;
+
+ byte nextIndentifier = currentIdentifier;
+ seek();
+
+ return new byte[]{nextIndentifier};
+ }
+
+ @Override
+ public void register(byte[] identifierArray)
+ {
+ Preconditions.checkState(!nextCalled);
+ Preconditions.checkState(!done);
+ Preconditions.checkArgument(identifierArray.length == 1);
+
+ byte identifier = identifierArray[0];
+
+ Preconditions.checkState(identifier >= currentIdentifier &&
+ !registeredIdentifier.contains(identifier));
+
+ registeredIdentifier.add(identifier);
+
+ if (currentIdentifier == identifier) {
+ seek();
+ }
+ }
+
+ private void seek()
+ {
+ if (currentIdentifier == Byte.MAX_VALUE) {
+ done = true;
+ } else {
+ do {
+ currentIdentifier++;
+ } while (registeredIdentifier.contains(currentIdentifier) && currentIdentifier < Byte.MAX_VALUE);
+
+ done = currentIdentifier == Byte.MAX_VALUE && registeredIdentifier.contains(currentIdentifier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
index 41a0efc..4c9b997 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/Spillable.java
@@ -98,19 +98,7 @@ public interface Spillable
* should implement this interface. A user working with an implementation of this interface needs
* to make sure that the {@link com.datatorrent.api.Operator} call-backs are propagated to it.
*/
- interface SpillableComponent extends Component<OperatorContext>, Spillable
+ interface SpillableComponent extends Component<OperatorContext>, Spillable, WindowListener
{
- /**
- * This signals that the parent {@link com.datatorrent.api.Operator}'s
- * {@link com.datatorrent.api.Operator#beginWindow(long)} method has been called.
- * @param windowId The next windowId of the parent operator.
- */
- void beginWindow(long windowId);
-
- /**
- * This signals that the parent {@link com.datatorrent.api.Operator}'s
- * {@link com.datatorrent.api.Operator#endWindow()} method has been called.
- */
- void endWindow();
}
}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
new file mode 100644
index 0000000..5d46906
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableArrayListImpl.java
@@ -0,0 +1,326 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.ListIterator;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SerdeListSlice;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link List} backed by a {@link SpillableStateStore}.
+ * @param <T> The type of object stored in the {@link SpillableArrayListImpl}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableArrayListImpl<T> implements Spillable.SpillableArrayList<T>, Spillable.SpillableComponent
+{
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ private long bucketId;
+ private byte[] prefix;
+
+ @NotNull
+ private SpillableStateStore store;
+ @NotNull
+ private Serde<T, Slice> serde;
+ @NotNull
+ private SpillableByteMapImpl<Integer, List<T>> map;
+
+ private boolean sizeCached = false;
+ private int size;
+ private int numBatches;
+
+ private SpillableArrayListImpl()
+ {
+ //for kryo
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
+
+ /**
+ * Creates a {@link SpillableArrayListImpl}.
+ * @param bucketId The Id of the bucket used to store this
+ * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
+ * @param prefix The Id of this {@link SpillableArrayListImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param serde The {@link Serde} to use when serializing and deserializing data.
+ */
+ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
+ @NotNull SpillableStateStore store,
+ @NotNull Serde<T, Slice> serde)
+ {
+ this.bucketId = bucketId;
+ this.prefix = Preconditions.checkNotNull(prefix);
+ this.store = Preconditions.checkNotNull(store);
+ this.serde = Preconditions.checkNotNull(serde);
+
+ map = new SpillableByteMapImpl<>(store, prefix, bucketId, new SerdeIntSlice(), new SerdeListSlice(serde));
+ }
+
+ /**
+ * Creates a {@link SpillableArrayListImpl}.
+ * @param bucketId The Id of the bucket used to store this
+ * {@link SpillableArrayListImpl} in the provided {@link SpillableStateStore}.
+ * @param prefix The Id of this {@link SpillableArrayListImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param serde The {@link Serde} to use when serializing and deserializing data.
+ * @param batchSize When spilled to a {@link SpillableStateStore} data is stored in a batch. This determines the
+ * number of elements a batch will contain when it's spilled. Having small batches will increase
+ * the number of keys stored by your {@link SpillableStateStore} but will improve random reads and
+ * writes. Increasing the batch size will improve sequential read and write speed.
+ */
+ public SpillableArrayListImpl(long bucketId, @NotNull byte[] prefix,
+ @NotNull SpillableStateStore store,
+ @NotNull Serde<T, Slice> serde,
+ int batchSize)
+ {
+ this(bucketId, prefix, store, serde);
+
+ Preconditions.checkArgument(this.batchSize > 0);
+ this.batchSize = batchSize;
+ }
+
+ public void setSize(int size)
+ {
+ Preconditions.checkArgument(size >= 0);
+ this.size = size;
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ @Override
+ public boolean contains(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Iterator<T> iterator()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Object[] toArray()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public <T1> T1[] toArray(T1[] t1s)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean add(T t)
+ {
+ Preconditions.checkArgument((size() + 1) > 0);
+
+ int batchIndex = (size / batchSize);
+
+ List<T> batch = null;
+
+ if (batchIndex == numBatches) {
+ batch = Lists.newArrayListWithCapacity(batchSize);
+ numBatches++;
+ } else {
+ batch = map.get(batchIndex);
+ }
+
+ batch.add(t);
+
+ size++;
+ map.put(batchIndex, batch);
+ return true;
+ }
+
+ @Override
+ public boolean remove(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends T> collection)
+ {
+ for (T element: collection) {
+ add(element);
+ }
+
+ return true;
+ }
+
+ @Override
+ public boolean addAll(int i, Collection<? extends T> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> collection)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T get(int i)
+ {
+ if (!(i < size)) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int batchIndex = i / batchSize;
+ int batchOffset = i % batchSize;
+
+ List<T> batch = map.get(batchIndex);
+ return batch.get(batchOffset);
+ }
+
+ @Override
+ public T set(int i, T t)
+ {
+ if (!(i < size)) {
+ throw new IndexOutOfBoundsException();
+ }
+
+ int batchIndex = i / batchSize;
+ int batchOffset = i % batchSize;
+
+ List<T> batch = map.get(batchIndex);
+ T old = batch.get(batchOffset);
+ batch.set(batchOffset, t);
+ map.put(batchIndex, batch);
+ return old;
+ }
+
+ @Override
+ public void add(int i, T t)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public T remove(int i)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int indexOf(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int lastIndexOf(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListIterator<T> listIterator()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ListIterator<T> listIterator(int i)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<T> subList(int i, int i1)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ map.setup(context);
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ map.beginWindow(windowId);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ map.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ map.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
new file mode 100644
index 0000000..ba0bb77
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteArrayListMultimapImpl.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.PassThruByteArraySliceSerde;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SerdeIntSlice;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Multimap;
+import com.google.common.collect.Multiset;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of Guava's ListMultimap which spills data to a {@link SpillableStateStore}.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableByteArrayListMultimapImpl<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>,
+ Spillable.SpillableComponent
+{
+ public static final int DEFAULT_BATCH_SIZE = 1000;
+ public static final byte[] SIZE_KEY_SUFFIX = new byte[]{(byte)0, (byte)0, (byte)0};
+
+ private transient WindowBoundedMapCache<K, SpillableArrayListImpl<V>> cache = new WindowBoundedMapCache<>();
+ private transient boolean isRunning = false;
+ private transient boolean isInWindow = false;
+
+ private int batchSize = DEFAULT_BATCH_SIZE;
+ @NotNull
+ private SpillableByteMapImpl<byte[], Integer> map;
+ private SpillableStateStore store;
+ private byte[] identifier;
+ private long bucket;
+ private Serde<K, Slice> serdeKey;
+ private Serde<V, Slice> serdeValue;
+
+ private SpillableByteArrayListMultimapImpl()
+ {
+ // for kryo
+ }
+
+ /**
+ * Creates a {@link SpillableByteArrayListMultimapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableByteArrayListMultimapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableByteArrayListMultimapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ */
+ public SpillableByteArrayListMultimapImpl(SpillableStateStore store, byte[] identifier, long bucket,
+ Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.bucket = bucket;
+ this.serdeKey = Preconditions.checkNotNull(serdeKey);
+ this.serdeValue = Preconditions.checkNotNull(serdeValue);
+
+ map = new SpillableByteMapImpl(store, identifier, bucket, new PassThruByteArraySliceSerde(), new SerdeIntSlice());
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return store;
+ }
+
+ @Override
+ public List<V> get(@Nullable K key)
+ {
+ return getHelper(key);
+ }
+
+ private SpillableArrayListImpl<V> getHelper(@Nullable K key)
+ {
+ SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+
+ if (spillableArrayList == null) {
+ Slice keySlice = serdeKey.serialize(key);
+ Integer size = map.get(SliceUtils.concatenate(keySlice, SIZE_KEY_SUFFIX).toByteArray());
+
+ if (size == null) {
+ return null;
+ }
+
+ Slice keyPrefix = SliceUtils.concatenate(identifier, keySlice);
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+ spillableArrayList.setSize(size);
+ }
+
+ cache.put(key, spillableArrayList);
+
+ return spillableArrayList;
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Multiset<K> keys()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<Map.Entry<K, V>> entries()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public List<V> removeAll(@Nullable Object key)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int size()
+ {
+ return map.size();
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return map.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(@Nullable Object key)
+ {
+ return cache.contains((K)key) || map.containsKey(SliceUtils.concatenate(serdeKey.serialize((K)key),
+ SIZE_KEY_SUFFIX).toByteArray());
+ }
+
+ @Override
+ public boolean containsValue(@Nullable Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean containsEntry(@Nullable Object key, @Nullable Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean put(@Nullable K key, @Nullable V value)
+ {
+ SpillableArrayListImpl<V> spillableArrayList = getHelper(key);
+
+ if (spillableArrayList == null) {
+ Slice keyPrefix = SliceUtils.concatenate(identifier, serdeKey.serialize(key));
+ spillableArrayList = new SpillableArrayListImpl<V>(bucket, keyPrefix.toByteArray(), store, serdeValue);
+
+ cache.put(key, spillableArrayList);
+ }
+
+ spillableArrayList.add(value);
+ return true;
+ }
+
+ @Override
+ public boolean remove(@Nullable Object key, @Nullable Object value)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean putAll(@Nullable K key, Iterable<? extends V> values)
+ {
+ boolean changed = false;
+
+ for (V value: values) {
+ changed |= put(key, value);
+ }
+
+ return changed;
+ }
+
+ @Override
+ public boolean putAll(Multimap<? extends K, ? extends V> multimap)
+ {
+ boolean changed = false;
+
+ for (Map.Entry<? extends K, ? extends V> entry: multimap.entries()) {
+ changed |= put(entry.getKey(), entry.getValue());
+ }
+
+ return changed;
+ }
+
+ @Override
+ public List<V> replaceValues(K key, Iterable<? extends V> values)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Map<K, Collection<V>> asMap()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ map.setup(context);
+ isRunning = true;
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ map.beginWindow(windowId);
+ isInWindow = true;
+ }
+
+ @Override
+ public void endWindow()
+ {
+ isInWindow = false;
+ for (K key: cache.getChangedKeys()) {
+
+ SpillableArrayListImpl<V> spillableArrayList = cache.get(key);
+ spillableArrayList.endWindow();
+
+ Integer size = map.put(SliceUtils.concatenate(serdeKey.serialize(key), SIZE_KEY_SUFFIX).toByteArray(),
+ spillableArrayList.size());
+ }
+
+ Preconditions.checkState(cache.getRemovedKeys().isEmpty());
+ cache.endWindow();
+ map.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ isRunning = false;
+ map.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
new file mode 100644
index 0000000..da313ee
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableByteMapImpl.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.apex.malhar.lib.utils.serde.SliceUtils;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A Spillable implementation of {@link Map}
+ * @param <K> The types of keys.
+ * @param <V> The types of values.
+ */
+@DefaultSerializer(FieldSerializer.class)
+@InterfaceStability.Evolving
+public class SpillableByteMapImpl<K, V> implements Spillable.SpillableByteMap<K, V>, Spillable.SpillableComponent,
+ Serializable
+{
+ private transient WindowBoundedMapCache<K, V> cache = new WindowBoundedMapCache<>();
+ private transient MutableInt tempOffset = new MutableInt();
+
+ @NotNull
+ private SpillableStateStore store;
+ @NotNull
+ private byte[] identifier;
+ private long bucket;
+ @NotNull
+ private Serde<K, Slice> serdeKey;
+ @NotNull
+ private Serde<V, Slice> serdeValue;
+
+ private int size = 0;
+
+ private SpillableByteMapImpl()
+ {
+ //for kryo
+ }
+
+ /**
+ * Creats a {@link SpillableByteMapImpl}.
+ * @param store The {@link SpillableStateStore} in which to spill to.
+ * @param identifier The Id of this {@link SpillableByteMapImpl}.
+ * @param bucket The Id of the bucket used to store this
+ * {@link SpillableByteMapImpl} in the provided {@link SpillableStateStore}.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing keys.
+ * @param serdeKey The {@link Serde} to use when serializing and deserializing values.
+ */
+ public SpillableByteMapImpl(SpillableStateStore store, byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifier = Preconditions.checkNotNull(identifier);
+ this.bucket = bucket;
+ this.serdeKey = Preconditions.checkNotNull(serdeKey);
+ this.serdeValue = Preconditions.checkNotNull(serdeValue);
+ }
+
+ public SpillableStateStore getStore()
+ {
+ return this.store;
+ }
+
+ @Override
+ public int size()
+ {
+ return size;
+ }
+
+ @Override
+ public boolean isEmpty()
+ {
+ return size == 0;
+ }
+
+ @Override
+ public boolean containsKey(Object o)
+ {
+ return get(o) != null;
+ }
+
+ @Override
+ public boolean containsValue(Object o)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public V get(Object o)
+ {
+ K key = (K)o;
+
+ if (cache.getRemovedKeys().contains(key)) {
+ return null;
+ }
+
+ V val = cache.get(key);
+
+ if (val != null) {
+ return val;
+ }
+
+ Slice valSlice = store.getSync(bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)));
+
+ if (valSlice == null || valSlice == BucketedState.EXPIRED || valSlice.length == 0) {
+ return null;
+ }
+
+ tempOffset.setValue(0);
+ return serdeValue.deserialize(valSlice, tempOffset);
+ }
+
+ @Override
+ public V put(K k, V v)
+ {
+ V value = get(k);
+
+ if (value == null) {
+ size++;
+ }
+
+ cache.put(k, v);
+
+ return value;
+ }
+
+ @Override
+ public V remove(Object o)
+ {
+ V value = get(o);
+
+ if (value != null) {
+ size--;
+ }
+
+ cache.remove((K)o);
+
+ return value;
+ }
+
+ @Override
+ public void putAll(Map<? extends K, ? extends V> map)
+ {
+ for (Map.Entry<? extends K, ? extends V> entry : map.entrySet()) {
+ put(entry.getKey(), entry.getValue());
+ }
+ }
+
+ @Override
+ public void clear()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<K> keySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Collection<V> values()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Set<Entry<K, V>> entrySet()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (K key: cache.getChangedKeys()) {
+ store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+ serdeValue.serialize(cache.get(key)));
+ }
+
+ for (K key: cache.getRemovedKeys()) {
+ store.put(this.bucket, SliceUtils.concatenate(identifier, serdeKey.serialize(key)),
+ new Slice(ArrayUtils.EMPTY_BYTE_ARRAY));
+ }
+
+ cache.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
index 29da3f5..c63c7ef 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponent.java
@@ -23,6 +23,7 @@ import org.apache.apex.malhar.lib.utils.serde.Serde;
import com.datatorrent.api.Component;
import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
import com.datatorrent.netlet.util.Slice;
/**
@@ -31,7 +32,8 @@ import com.datatorrent.netlet.util.Slice;
*
* @since 3.4.0
*/
-public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent
+public interface SpillableComplexComponent extends Component<OperatorContext>, SpillableComponent,
+ Operator.CheckpointNotificationListener
{
/**
* This is a method for creating a {@link SpillableArrayList}. This method
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
new file mode 100644
index 0000000..b31adfd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableComplexComponentImpl.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.utils.serde.Serde;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a factory that is used for Spillable datastructures. This component is used by nesting it inside of an
+ * operator and forwarding the appropriate operator callbacks are called on the {@link SpillableComplexComponentImpl}.
+ * Spillable datastructures are created by called the appropriate factory methods on the
+ * {@link SpillableComplexComponentImpl} in the setup method of an operator.
+ */
+@InterfaceStability.Evolving
+public class SpillableComplexComponentImpl implements SpillableComplexComponent
+{
+ private List<SpillableComponent> componentList = Lists.newArrayList();
+
+ @NotNull
+ private SpillableStateStore store;
+
+ @NotNull
+ private SpillableIdentifierGenerator identifierGenerator;
+
+ private SpillableComplexComponentImpl()
+ {
+ // for kryo
+ }
+
+ public SpillableComplexComponentImpl(SpillableStateStore store)
+ {
+ this(store, new SequentialSpillableIdentifierGenerator());
+ }
+
+ public SpillableComplexComponentImpl(SpillableStateStore store, SpillableIdentifierGenerator identifierGenerator)
+ {
+ this.store = Preconditions.checkNotNull(store);
+ this.identifierGenerator = Preconditions.checkNotNull(identifierGenerator);
+ }
+
+ public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
+ {
+ SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifierGenerator.next(), store, serde);
+ componentList.add(list);
+ return list;
+ }
+
+ public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ {
+ identifierGenerator.register(identifier);
+ SpillableArrayListImpl<T> list = new SpillableArrayListImpl<T>(bucket, identifier, store, serde);
+ componentList.add(list);
+ return list;
+ }
+
+ public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifierGenerator.next(),
+ bucket, serdeKey, serdeValue);
+ componentList.add(map);
+ return map;
+ }
+
+ public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket, Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ identifierGenerator.register(identifier);
+ SpillableByteMapImpl<K, V> map = new SpillableByteMapImpl<K, V>(store, identifier, bucket, serdeKey, serdeValue);
+ componentList.add(map);
+ return map;
+ }
+
+ public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket, Serde<K,
+ Slice> serdeKey, Serde<V, Slice> serdeValue)
+ {
+ SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+ identifierGenerator.next(), bucket, serdeKey, serdeValue);
+ componentList.add(map);
+ return map;
+ }
+
+ public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier, long bucket,
+ Serde<K, Slice> serdeKey,
+ Serde<V, Slice> serdeValue)
+ {
+ identifierGenerator.register(identifier);
+ SpillableByteArrayListMultimapImpl<K, V> map = new SpillableByteArrayListMultimapImpl<K, V>(store,
+ identifier, bucket, serdeKey, serdeValue);
+ componentList.add(map);
+ return map;
+ }
+
+ public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
+ {
+ throw new UnsupportedOperationException("Unsupported Operation");
+ }
+
+ public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ {
+ throw new UnsupportedOperationException("Unsupported Operation");
+ }
+
+ public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
+ {
+ throw new UnsupportedOperationException("Unsupported Operation");
+ }
+
+ public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
+ {
+ throw new UnsupportedOperationException("Unsupported Operation");
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ store.setup(context);
+ for (SpillableComponent spillableComponent: componentList) {
+ spillableComponent.setup(context);
+ }
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+ store.beginWindow(windowId);
+ for (SpillableComponent spillableComponent: componentList) {
+ spillableComponent.beginWindow(windowId);
+ }
+ }
+
+ @Override
+ public void endWindow()
+ {
+ for (SpillableComponent spillableComponent: componentList) {
+ spillableComponent.endWindow();
+ }
+ store.endWindow();
+ }
+
+ @Override
+ public void teardown()
+ {
+ for (SpillableComponent spillableComponent: componentList) {
+ spillableComponent.teardown();
+ }
+ store.teardown();
+ }
+
+ @Override
+ public void beforeCheckpoint(long l)
+ {
+ store.beforeCheckpoint(l);
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ store.checkpointed(l);
+ }
+
+ @Override
+ public void committed(long l)
+ {
+ store.committed(l);
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
new file mode 100644
index 0000000..17a52f0
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableIdentifierGenerator.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Classes implementing this interface can be used as generators for identifiers for Spillable data structures. This is
+ * mainly used in implementations of {@link SpillableComplexComponent}.
+ */
+@InterfaceStability.Evolving
+public interface SpillableIdentifierGenerator
+{
+ /**
+ * Generators the next valid identifier for a Spillable data structure.
+ * @return A byte array which represents the next valid identifier for a Spillable data structure.
+ */
+ byte[] next();
+
+ /**
+ * Registers the given identifier with this {@link SpillableIdentifierGenerator}.
+ * @param identifier The identifier to register with this {@link SpillableIdentifierGenerator}.
+ */
+ void register(byte[] identifier);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
new file mode 100644
index 0000000..1db0eeb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/SpillableStateStore.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.Operator;
+
+/**
+ * Implementations of this interface are used by Spillable datastructures to spill data to disk.
+ */
+@InterfaceStability.Evolving
+public interface SpillableStateStore extends BucketedState, Component<Context.OperatorContext>,
+ Operator.CheckpointNotificationListener, WindowListener
+{
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
new file mode 100644
index 0000000..025c501
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/TimeBasedPriorityQueue.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * A simple priority queue where the priority of an object is determined by the time at which it is inserted into the
+ * queue. The object in the queue with the smallest time stamp is the first to be dequeued.
+ * @param <T> The type of the objects inserted into the queue.
+ */
+@InterfaceStability.Evolving
+public class TimeBasedPriorityQueue<T>
+{
+ private Map<T, TimeWrapper<T>> timeWrappperMap = Maps.newHashMap();
+ private Set<TimeWrapper<T>> sortedTimestamp = Sets.newTreeSet();
+
+ public void upSert(T value)
+ {
+ TimeWrapper<T> timeWrapper = timeWrappperMap.get(value);
+
+ if (timeWrapper != null) {
+ sortedTimestamp.remove(timeWrapper);
+ timeWrapper.setTimestamp(System.currentTimeMillis());
+ } else {
+ timeWrapper = new TimeWrapper<>(value, System.currentTimeMillis());
+ timeWrappperMap.put(value, timeWrapper);
+ }
+
+ sortedTimestamp.add(timeWrapper);
+ }
+
+ public void remove(T value)
+ {
+ TimeWrapper<T> timeWrapper = timeWrappperMap.get(value);
+ sortedTimestamp.remove(timeWrapper);
+ timeWrappperMap.remove(value);
+ }
+
+ public Set<T> removeLRU(int count)
+ {
+ Preconditions.checkArgument(count > 0 && count <= timeWrappperMap.size());
+
+ Iterator<TimeWrapper<T>> iterator = sortedTimestamp.iterator();
+ Set<T> valueSet = Sets.newHashSet();
+
+ for (int counter = 0; counter < count; counter++) {
+ T value = iterator.next().getKey();
+ valueSet.add(value);
+ timeWrappperMap.remove(value);
+ iterator.remove();
+ }
+
+ return valueSet;
+ }
+
+ protected static class TimeWrapper<T> implements Comparable<TimeWrapper<T>>
+ {
+ private T key;
+ private long timestamp;
+
+ public TimeWrapper(T key, long timestamp)
+ {
+ this.key = Preconditions.checkNotNull(key);
+ this.timestamp = timestamp;
+ }
+
+ public T getKey()
+ {
+ return key;
+ }
+
+ public long getTimestamp()
+ {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp)
+ {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public int compareTo(TimeWrapper<T> timeWrapper)
+ {
+ if (this.timestamp < timeWrapper.getTimestamp()) {
+ return -1;
+ } else if (this.timestamp > timeWrapper.getTimestamp()) {
+ return 1;
+ }
+
+ return 0;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ TimeWrapper<?> that = (TimeWrapper<?>)o;
+
+ return key.equals(that.key);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return key.hashCode();
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TimeWrapper{" +
+ "key=" + key +
+ ", timestamp=" + timestamp +
+ '}';
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(TimeBasedPriorityQueue.class);
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
new file mode 100644
index 0000000..fcf219d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowBoundedMapCache.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+/**
+ * This is an LRU cache with a maximum size. When the cache size is exceeded, the excess elements are kept in the
+ * cache until the end of the window. When the end of the window is reached, the least recently used entries are
+ * evicted from the cache.
+ * @param <K> The type of the keys.
+ * @param <V> The type of the values.
+ */
+@InterfaceStability.Evolving
+public class WindowBoundedMapCache<K, V>
+{
+ public static final int DEFAULT_MAX_SIZE = 50000;
+
+ private int maxSize = DEFAULT_MAX_SIZE;
+
+ private Map<K, V> cache = Maps.newHashMap();
+
+ private Set<K> changedKeys = Sets.newHashSet();
+ private Set<K> removedKeys = Sets.newHashSet();
+ private TimeBasedPriorityQueue<K> priorityQueue = new TimeBasedPriorityQueue<>();
+
+ public WindowBoundedMapCache()
+ {
+ }
+
+ public WindowBoundedMapCache(int maxSize)
+ {
+ Preconditions.checkArgument(maxSize > 0);
+
+ this.maxSize = maxSize;
+ }
+
+ public void put(K key, V value)
+ {
+ Preconditions.checkNotNull(key);
+ Preconditions.checkNotNull(value);
+
+ removedKeys.remove(key);
+ changedKeys.add(key);
+ priorityQueue.upSert(key);
+
+ cache.put(key, value);
+ }
+
+ public V get(K key)
+ {
+ Preconditions.checkNotNull(key);
+
+ return cache.get(key);
+ }
+
+ public boolean contains(K key)
+ {
+ return cache.containsKey(key);
+ }
+
+ public void remove(K key)
+ {
+ Preconditions.checkNotNull(key);
+
+ if (!cache.containsKey(key)) {
+ return;
+ }
+
+ cache.remove(key);
+ changedKeys.remove(key);
+ removedKeys.add(key);
+ priorityQueue.remove(key);
+ }
+
+ public Set<K> getChangedKeys()
+ {
+ return changedKeys;
+ }
+
+ public Set<K> getRemovedKeys()
+ {
+ return removedKeys;
+ }
+
+ /*
+ Note: beginWindow is intentionally not implemented because many users need a cache that does not require
+ beginWindow to be called.
+ */
+
+ public void endWindow()
+ {
+ int count = cache.size() - maxSize;
+
+ if (count > 0) {
+ Set<K> expiredKeys = priorityQueue.removeLRU(count);
+
+ for (K expiredKey: expiredKeys) {
+ cache.remove(expiredKey);
+ }
+ }
+
+ changedKeys = Sets.newHashSet();
+ removedKeys = Sets.newHashSet();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
new file mode 100644
index 0000000..fa8cd9f
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/WindowListener.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable;
+
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.api.Operator;
+
+/**
+ * This interface represents components which need to listen to the operator {@link Operator#beginWindow(long)} and
+ * {@link Operator#endWindow()} callbacks.
+ */
+@InterfaceStability.Evolving
+public interface WindowListener
+{
+ /**
+ * This is called when the parent {@link Operator}'s {@link Operator#beginWindow(long)} callback is called.
+ * @param windowId The id of the current application window.
+ */
+ void beginWindow(long windowId);
+
+ /**
+ * This is called when the parent {@link Operator}'s {@link Operator#endWindow()} callback is called.
+ */
+ void endWindow();
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
deleted file mode 100644
index fa7bf08..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemMultiset.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.HashMultiset;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableByteMultiset} interface.
- * @param <T> The type of the data stored in the {@link InMemMultiset}
- */
-public class InMemMultiset<T> implements Spillable.SpillableByteMultiset<T>
-{
- @FieldSerializer.Bind(JavaSerializer.class)
- private HashMultiset<T> multiset = HashMultiset.create();
-
- @Override
- public int count(@Nullable Object element)
- {
- return multiset.count(element);
- }
-
- @Override
- public int add(@Nullable T element, int occurrences)
- {
- return multiset.add(element, occurrences);
- }
-
- @Override
- public int remove(@Nullable Object element, int occurrences)
- {
- return multiset.remove(element, occurrences);
- }
-
- @Override
- public int setCount(T element, int count)
- {
- return multiset.setCount(element, count);
- }
-
- @Override
- public boolean setCount(T element, int oldCount, int newCount)
- {
- return multiset.setCount(element, oldCount, newCount);
- }
-
- @Override
- public Set<T> elementSet()
- {
- return multiset.elementSet();
- }
-
- @Override
- public Set<Entry<T>> entrySet()
- {
- return multiset.entrySet();
- }
-
- @Override
- public Iterator<T> iterator()
- {
- return multiset.iterator();
- }
-
- @Override
- public Object[] toArray()
- {
- return multiset.toArray();
- }
-
- @Override
- public <T1> T1[] toArray(T1[] t1s)
- {
- return multiset.toArray(t1s);
- }
-
- @Override
- public int size()
- {
- return multiset.size();
- }
-
- @Override
- public boolean isEmpty()
- {
- return multiset.isEmpty();
- }
-
- @Override
- public boolean contains(@Nullable Object element)
- {
- return multiset.contains(element);
- }
-
- @Override
- public boolean containsAll(Collection<?> es)
- {
- return multiset.containsAll(es);
- }
-
- @Override
- public boolean addAll(Collection<? extends T> collection)
- {
- return multiset.addAll(collection);
- }
-
- @Override
- public boolean add(T element)
- {
- return multiset.add(element);
- }
-
- @Override
- public boolean remove(@Nullable Object element)
- {
- return multiset.remove(element);
- }
-
- @Override
- public boolean removeAll(Collection<?> c)
- {
- return multiset.removeAll(c);
- }
-
- @Override
- public boolean retainAll(Collection<?> c)
- {
- return multiset.retainAll(c);
- }
-
- @Override
- public void clear()
- {
- multiset.clear();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
deleted file mode 100644
index 9742537..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableArrayList.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-import java.util.ListIterator;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.google.common.collect.Lists;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableArrayList} interface.
- * @param <T> The type of the data stored in the {@link InMemSpillableArrayList}
- */
-public class InMemSpillableArrayList<T> implements Spillable.SpillableArrayList<T>
-{
- private List<T> list = Lists.newArrayList();
-
- @Override
- public int size()
- {
- return list.size();
- }
-
- @Override
- public boolean isEmpty()
- {
- return list.isEmpty();
- }
-
- @Override
- public boolean contains(Object o)
- {
- return list.contains(o);
- }
-
- @Override
- public Iterator<T> iterator()
- {
- return list.iterator();
- }
-
- @Override
- public Object[] toArray()
- {
- return list.toArray();
- }
-
- @Override
- public <T1> T1[] toArray(T1[] t1s)
- {
- return list.toArray(t1s);
- }
-
- @Override
- public boolean add(T t)
- {
- return list.add(t);
- }
-
- @Override
- public boolean remove(Object o)
- {
- return list.remove(o);
- }
-
- @Override
- public boolean containsAll(Collection<?> collection)
- {
- return list.containsAll(collection);
- }
-
- @Override
- public boolean addAll(Collection<? extends T> collection)
- {
- return list.addAll(collection);
- }
-
- @Override
- public boolean addAll(int i, Collection<? extends T> collection)
- {
- return list.addAll(i, collection);
- }
-
- @Override
- public boolean removeAll(Collection<?> collection)
- {
- return list.removeAll(collection);
- }
-
- @Override
- public boolean retainAll(Collection<?> collection)
- {
- return list.retainAll(collection);
- }
-
- @Override
- public void clear()
- {
- list.clear();
- }
-
- @Override
- public T get(int i)
- {
- return list.get(i);
- }
-
- @Override
- public T set(int i, T t)
- {
- return list.set(i, t);
- }
-
- @Override
- public void add(int i, T t)
- {
- list.add(i, t);
- }
-
- @Override
- public T remove(int i)
- {
- return list.remove(i);
- }
-
- @Override
- public int indexOf(Object o)
- {
- return list.indexOf(o);
- }
-
- @Override
- public int lastIndexOf(Object o)
- {
- return list.lastIndexOf(o);
- }
-
- @Override
- public ListIterator<T> listIterator()
- {
- return list.listIterator();
- }
-
- @Override
- public ListIterator<T> listIterator(int i)
- {
- return list.listIterator(i);
- }
-
- @Override
- public List<T> subList(int i, int i1)
- {
- return list.subList(i, i1);
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
deleted file mode 100644
index 8376bd5..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableByteArrayListMultimap.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import javax.annotation.Nullable;
-
-import org.apache.apex.malhar.lib.state.spillable.Spillable;
-
-import com.esotericsoftware.kryo.serializers.FieldSerializer;
-import com.esotericsoftware.kryo.serializers.JavaSerializer;
-import com.google.common.collect.ArrayListMultimap;
-import com.google.common.collect.ListMultimap;
-import com.google.common.collect.Multimap;
-import com.google.common.collect.Multiset;
-
-/**
- * An in memory implementation of the {@link Spillable.SpillableByteArrayListMultimap} interface.
- * @param <K> The type of the keys stored in the {@link InMemSpillableByteArrayListMultimap}
- * @param <V> The type of the values stored in the {@link InMemSpillableByteArrayListMultimap}
- */
-public class InMemSpillableByteArrayListMultimap<K, V> implements Spillable.SpillableByteArrayListMultimap<K, V>
-{
- @FieldSerializer.Bind(JavaSerializer.class)
- private ListMultimap<K, V> multimap = ArrayListMultimap.create();
-
- @Override
- public List<V> get(@Nullable K key)
- {
- return multimap.get(key);
- }
-
- @Override
- public Set<K> keySet()
- {
- return multimap.keySet();
- }
-
- @Override
- public Multiset<K> keys()
- {
- return multimap.keys();
- }
-
- @Override
- public Collection<V> values()
- {
- return multimap.values();
- }
-
- @Override
- public Collection<Map.Entry<K, V>> entries()
- {
- return multimap.entries();
- }
-
- @Override
- public List<V> removeAll(@Nullable Object key)
- {
- return multimap.removeAll(key);
- }
-
- @Override
- public void clear()
- {
- multimap.clear();
- }
-
- @Override
- public int size()
- {
- return multimap.size();
- }
-
- @Override
- public boolean isEmpty()
- {
- return multimap.isEmpty();
- }
-
- @Override
- public boolean containsKey(@Nullable Object key)
- {
- return multimap.containsKey(key);
- }
-
- @Override
- public boolean containsValue(@Nullable Object value)
- {
- return multimap.containsValue(value);
- }
-
- @Override
- public boolean containsEntry(@Nullable Object key, @Nullable Object value)
- {
- return multimap.containsEntry(key, value);
- }
-
- @Override
- public boolean put(@Nullable K key, @Nullable V value)
- {
- return multimap.put(key, value);
- }
-
- @Override
- public boolean remove(@Nullable Object key, @Nullable Object value)
- {
- return multimap.remove(key, value);
- }
-
- @Override
- public boolean putAll(@Nullable K key, Iterable<? extends V> values)
- {
- return multimap.putAll(key, values);
- }
-
- @Override
- public boolean putAll(Multimap<? extends K, ? extends V> m)
- {
- return multimap.putAll(m);
- }
-
- @Override
- public List<V> replaceValues(K key, Iterable<? extends V> values)
- {
- return multimap.replaceValues(key, values);
- }
-
- @Override
- public Map<K, Collection<V>> asMap()
- {
- return multimap.asMap();
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
deleted file mode 100644
index 25e8b2c..0000000
--- a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableComplexComponent.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.state.spillable.inmem;
-
-import org.apache.apex.malhar.lib.state.spillable.SpillableComplexComponent;
-import org.apache.apex.malhar.lib.utils.serde.Serde;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.Slice;
-
-/**
- * An in memory implementation {@link SpillableComplexComponent}
- */
-public class InMemSpillableComplexComponent implements SpillableComplexComponent
-{
- @Override
- public <T> SpillableArrayList<T> newSpillableArrayList(long bucket, Serde<T, Slice> serde)
- {
- return new InMemSpillableArrayList<>();
- }
-
- @Override
- public <T> SpillableArrayList<T> newSpillableArrayList(byte[] identifier, long bucket,
- Serde<T, Slice> serde)
- {
- return new InMemSpillableArrayList<>();
- }
-
- @Override
- public <K, V> SpillableByteMap<K, V> newSpillableByteMap(long bucket, Serde<K, Slice> serdeKey,
- Serde<V, Slice> serdeValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <K, V> SpillableByteMap<K, V> newSpillableByteMap(byte[] identifier, long bucket,
- Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(long bucket,
- Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
- {
- return new InMemSpillableByteArrayListMultimap<>();
- }
-
- @Override
- public <K, V> SpillableByteArrayListMultimap<K, V> newSpillableByteArrayListMultimap(byte[] identifier,
- long bucket, Serde<K, Slice> serdeKey, Serde<V, Slice> serdeValue)
- {
- return new InMemSpillableByteArrayListMultimap<>();
- }
-
- @Override
- public <T> SpillableByteMultiset<T> newSpillableByteMultiset(long bucket, Serde<T, Slice> serde)
- {
- return new InMemMultiset<>();
- }
-
- @Override
- public <T> SpillableByteMultiset<T> newSpillableByteMultiset(byte[] identifier, long bucket,
- Serde<T, Slice> serde)
- {
- return new InMemMultiset<>();
- }
-
- @Override
- public <T> SpillableQueue<T> newSpillableQueue(long bucket, Serde<T, Slice> serde)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public <T> SpillableQueue<T> newSpillableQueue(byte[] identifier, long bucket, Serde<T, Slice> serde)
- {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void setup(Context.OperatorContext context)
- {
- }
-
- @Override
- public void beginWindow(long windowId)
- {
- }
-
- @Override
- public void endWindow()
- {
- }
-
- @Override
- public void teardown()
- {
- }
-}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
new file mode 100644
index 0000000..0e65344
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/inmem/InMemSpillableStateStore.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.inmem;
+
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A simple in memory implementation of a {@link SpillableStateStore} backed by a {@link Map}.
+ */
+@InterfaceStability.Evolving
+public class InMemSpillableStateStore implements SpillableStateStore
+{
+ private Map<Long, Map<Slice, Slice>> store = Maps.newHashMap();
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+
+ }
+
+ @Override
+ public void beginWindow(long windowId)
+ {
+
+ }
+
+ @Override
+ public void endWindow()
+ {
+
+ }
+
+ @Override
+ public void teardown()
+ {
+
+ }
+
+ @Override
+ public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
+ {
+ Map<Slice, Slice> bucket = store.get(bucketId);
+
+ if (bucket == null) {
+ bucket = Maps.newHashMap();
+ store.put(bucketId, bucket);
+ }
+
+ bucket.put(key, value);
+ }
+
+ @Override
+ public Slice getSync(long bucketId, @NotNull Slice key)
+ {
+ Map<Slice, Slice> bucket = store.get(bucketId);
+
+ if (bucket == null) {
+ bucket = Maps.newHashMap();
+ store.put(bucketId, bucket);
+ }
+
+ return bucket.get(key);
+ }
+
+ @Override
+ public Future<Slice> getAsync(long bucketId, @NotNull Slice key)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void beforeCheckpoint(long l)
+ {
+ }
+
+ @Override
+ public void checkpointed(long l)
+ {
+ }
+
+ @Override
+ public void committed(long l)
+ {
+ }
+
+ @Override
+ public String toString()
+ {
+ return store.toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
new file mode 100644
index 0000000..6d68acc
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/spillable/managed/ManagedStateSpillableStateStore.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.spillable.managed;
+
+import org.apache.apex.malhar.lib.state.managed.ManagedStateImpl;
+import org.apache.apex.malhar.lib.state.spillable.SpillableStateStore;
+
+import com.esotericsoftware.kryo.DefaultSerializer;
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+
+@DefaultSerializer(FieldSerializer.class)
+public class ManagedStateSpillableStateStore extends ManagedStateImpl implements SpillableStateStore
+{
+ public ManagedStateSpillableStateStore()
+ {
+ super();
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
index 85c34d9..9669981 100644
--- a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySerde.java
@@ -19,6 +19,7 @@
package org.apache.apex.malhar.lib.utils.serde;
import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
/**
* This is a simple pass through {@link Serde}. When serialization is performed the input byte array is returned.
@@ -26,6 +27,7 @@ import org.apache.commons.lang3.mutable.MutableInt;
*
* @since 3.4.0
*/
+@InterfaceStability.Evolving
public class PassThruByteArraySerde implements Serde<byte[], byte[]>
{
@Override
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
new file mode 100644
index 0000000..436e7f8
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruByteArraySliceSerde.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a simple {@link Serde} which serializes and deserializes byte arrays to {@link Slice}s. A byte array is
+ * serialized by simply wrapping it in a {@link Slice} object and deserialized by simply reading the byte array
+ * out of the {@link Slice} object.
+ *
+ * <b>Note:</b> The deserialized method doesn't use the offset argument in this implementation.
+ */
+public class PassThruByteArraySliceSerde implements Serde<byte[], Slice>
+{
+ @Override
+ public Slice serialize(byte[] object)
+ {
+ return new Slice(object);
+ }
+
+ @Override
+ public byte[] deserialize(Slice object, MutableInt offset)
+ {
+ offset.add(object.length);
+
+ if (object.offset == 0) {
+ return object.buffer;
+ }
+
+ byte[] bytes = new byte[object.length];
+ System.arraycopy(object.buffer, object.offset, bytes, 0, object.length);
+ return bytes;
+ }
+
+ @Override
+ public byte[] deserialize(Slice object)
+ {
+ return deserialize(object, new MutableInt(0));
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
new file mode 100644
index 0000000..f9d93b3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/PassThruSliceSerde.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is a {@link Serde} implementation which simply allows an input slice to pass through. No serialization or
+ * deserialization transformation is performed on the input {@link Slice}s.
+ */
+@InterfaceStability.Evolving
+public class PassThruSliceSerde implements Serde<Slice, Slice>
+{
+ @Override
+ public Slice serialize(Slice object)
+ {
+ return object;
+ }
+
+ @Override
+ public Slice deserialize(Slice object, MutableInt offset)
+ {
+ return object;
+ }
+
+ @Override
+ public Slice deserialize(Slice object)
+ {
+ return object;
+ }
+}
http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/9b6e11d8/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
new file mode 100644
index 0000000..c18af33
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/utils/serde/SerdeIntSlice.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.utils.serde;
+
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.datatorrent.lib.appdata.gpo.GPOUtils;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This is an implementation of {@link Serde} which deserializes and serializes integers.
+ */
+@InterfaceStability.Evolving
+public class SerdeIntSlice implements Serde<Integer, Slice>
+{
+ @Override
+ public Slice serialize(Integer object)
+ {
+ return new Slice(GPOUtils.serializeInt(object));
+ }
+
+ @Override
+ public Integer deserialize(Slice slice, MutableInt offset)
+ {
+ int val = GPOUtils.deserializeInt(slice.buffer, new MutableInt(slice.offset + offset.intValue()));
+ offset.add(4);
+ return val;
+ }
+
+ @Override
+ public Integer deserialize(Slice object)
+ {
+ return deserialize(object, new MutableInt(0));
+ }
+}