You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/25 14:15:25 UTC
[2/3] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed
state
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
new file mode 100644
index 0000000..708cfeb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.annotation.OperatorAnnotation;
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.apex.malhar.lib.state.TimeSlicedBucketedState;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * This implementation of {@link AbstractManagedStateImpl} lets the client to specify the time for each key.
+ * The value of time is used to derive the time-bucket of a key.
+ */
+@OperatorAnnotation(checkpointableWithinAppWindow = false)
+public class ManagedTimeStateImpl extends AbstractManagedStateImpl implements TimeSlicedBucketedState
+{
+ public ManagedTimeStateImpl()
+ {
+ this.numBuckets = 1;
+ }
+
+ @Override
+ public void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ putInBucket(bucketId, timeBucket, key, value);
+ }
+
+ @Override
+ public Slice getSync(long bucketId, @NotNull Slice key)
+ {
+ return getValueFromBucketSync(bucketId, -1, key);
+ }
+
+ @Override
+ public Slice getSync(long bucketId, long time, @NotNull Slice key)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ if (timeBucket == -1) {
+ //time is expired so no point in looking further.
+ return BucketedState.EXPIRED;
+ }
+ return getValueFromBucketSync(bucketId, timeBucket, key);
+ }
+
+ @Override
+ public Future<Slice> getAsync(long bucketId, Slice key)
+ {
+ return getValueFromBucketAsync(bucketId, -1, key);
+ }
+
+ @Override
+ public Future<Slice> getAsync(long bucketId, long time, Slice key)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ if (timeBucket == -1) {
+ //time is expired so no point in looking further.
+ return Futures.immediateFuture(BucketedState.EXPIRED);
+ }
+ return getValueFromBucketAsync(bucketId, timeBucket, key);
+ }
+
+ @Min(1)
+ @Override
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * Sets the number of buckets.
+ *
+ * @param numBuckets number of buckets
+ */
+ public void setNumBuckets(int numBuckets)
+ {
+ this.numBuckets = numBuckets;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
new file mode 100644
index 0000000..6f531eb
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImpl.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Queues;
+import com.google.common.collect.Sets;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * In this implementation of {@link AbstractManagedStateImpl} the buckets in memory are time-buckets.
+ */
+public class ManagedTimeUnifiedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+ private final transient LinkedBlockingQueue<Long> purgedTimeBuckets = Queues.newLinkedBlockingQueue();
+ private final transient Set<Bucket> bucketsForTeardown = Sets.newHashSet();
+
+ public ManagedTimeUnifiedStateImpl()
+ {
+ bucketsFileSystem = new TimeUnifiedBucketsFileSystem();
+ }
+
+ @Override
+ public int getNumBuckets()
+ {
+ return timeBucketAssigner.getNumBuckets();
+ }
+
+ @Override
+ public void put(long time, @NotNull Slice key, @NotNull Slice value)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ putInBucket(timeBucket, timeBucket, key, value);
+ }
+
+ @Override
+ public Slice getSync(long time, @NotNull Slice key)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ if (timeBucket == -1) {
+ //time is expired so return expired slice.
+ return BucketedState.EXPIRED;
+ }
+ return getValueFromBucketSync(timeBucket, timeBucket, key);
+ }
+
+ @Override
+ public Future<Slice> getAsync(long time, @NotNull Slice key)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ if (timeBucket == -1) {
+ //time is expired so return expired slice.
+ return Futures.immediateFuture(BucketedState.EXPIRED);
+ }
+ return getValueFromBucketAsync(timeBucket, timeBucket, key);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ Long purgedTimeBucket;
+
+ //collect all the purged time buckets
+ while (null != (purgedTimeBucket = purgedTimeBuckets.poll())) {
+ int purgedTimeBucketIdx = getBucketIdx(purgedTimeBucket);
+ if (buckets[purgedTimeBucketIdx] != null && buckets[purgedTimeBucketIdx].getBucketId() == purgedTimeBucket) {
+ bucketsForTeardown.add(buckets[purgedTimeBucketIdx]);
+ buckets[purgedTimeBucketIdx] = null;
+ }
+ }
+
+ //tear down all the eligible time buckets
+ Iterator<Bucket> bucketIterator = bucketsForTeardown.iterator();
+ while (bucketIterator.hasNext()) {
+ Bucket bucket = bucketIterator.next();
+ if (!tasksPerBucketId.containsKey(bucket.getBucketId())) {
+ //no pending asynchronous queries for this bucket id
+ bucket.teardown();
+ bucketIterator.remove();
+ }
+ }
+ }
+
+ @Override
+ protected void handleBucketConflict(int bucketIdx, long newBucketId)
+ {
+ Preconditions.checkArgument(buckets[bucketIdx].getBucketId() < newBucketId, "new time bucket should have a value"
+ + " greater than the old time bucket");
+ //Time buckets are purged periodically so here a bucket conflict is expected and so we just ignore conflicts.
+ bucketsForTeardown.add(buckets[bucketIdx]);
+ buckets[bucketIdx] = newBucket(newBucketId);
+ buckets[bucketIdx].setup(this);
+ }
+
+ @Override
+ public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+ {
+ purgedTimeBuckets.add(timeBucket);
+ super.purgeTimeBucketsLessThanEqualTo(timeBucket);
+ }
+
+ /**
+ * This uses operator id instead of bucket id as the name of parent folder of time-buckets. This is because
+ * multiple partitions may work on same time-buckets.
+ */
+ private static class TimeUnifiedBucketsFileSystem extends BucketsFileSystem
+ {
+ @Override
+ protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getWriter(managedStateContext.getOperatorContext().getId(), fileName);
+ }
+
+ @Override
+ protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getReader(managedStateContext.getOperatorContext().getId(), fileName);
+ }
+
+ @Override
+ protected void rename(long bucketId, String fromName, String toName) throws IOException
+ {
+ managedStateContext.getFileAccess().rename(managedStateContext.getOperatorContext().getId(), fromName, toName);
+ }
+
+ @Override
+ protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getOutputStream(managedStateContext.getOperatorContext().getId(),
+ fileName);
+ }
+
+ @Override
+ protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getInputStream(managedStateContext.getOperatorContext().getId(),
+ fileName);
+ }
+
+ @Override
+ protected boolean exists(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().exists(managedStateContext.getOperatorContext().getId(),
+ fileName);
+ }
+
+ @Override
+ protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+ {
+ return managedStateContext.getFileAccess().listFiles(managedStateContext.getOperatorContext().getId());
+ }
+
+ @Override
+ protected void delete(long bucketId, String fileName) throws IOException
+ {
+ managedStateContext.getFileAccess().delete(managedStateContext.getOperatorContext().getId(), fileName);
+ }
+
+ @Override
+ protected void deleteBucket(long bucketId) throws IOException
+ {
+ managedStateContext.getFileAccess().deleteBucket(managedStateContext.getOperatorContext().getId());
+ }
+
+ @Override
+ protected void addBucketName(long bucketId)
+ {
+ long operatorId = (long)managedStateContext.getOperatorContext().getId();
+ if (!bucketNamesOnFS.contains(operatorId)) {
+ bucketNamesOnFS.add(operatorId);
+ }
+ }
+ }
+
+ private static transient Logger LOG = LoggerFactory.getLogger(ManagedTimeUnifiedStateImpl.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
new file mode 100644
index 0000000..a359b31
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Tracks the size of state in memory and evicts buckets.
+ */
+class StateTracker extends TimerTask
+{
+ //bucket id -> bucket id & time wrapper
+ private final transient ConcurrentHashMap<Long, BucketIdTimeWrapper> bucketAccessTimes = new ConcurrentHashMap<>();
+
+ private transient ConcurrentSkipListSet<BucketIdTimeWrapper> bucketHeap;
+
+ private final transient Timer memoryFreeService = new Timer();
+
+ protected transient AbstractManagedStateImpl managedStateImpl;
+
+ void setup(@NotNull AbstractManagedStateImpl managedStateImpl)
+ {
+ this.managedStateImpl = Preconditions.checkNotNull(managedStateImpl, "managed state impl");
+
+ this.bucketHeap = new ConcurrentSkipListSet<>(
+ new Comparator<BucketIdTimeWrapper>()
+ {
+ //Note: this comparator imposes orderings that are inconsistent with equals.
+ @Override
+ public int compare(BucketIdTimeWrapper o1, BucketIdTimeWrapper o2)
+ {
+ if (o1.getLastAccessedTime() < o2.getLastAccessedTime()) {
+ return -1;
+ }
+ if (o1.getLastAccessedTime() > o2.getLastAccessedTime()) {
+ return 1;
+ }
+
+ return Long.compare(o1.bucketId, o2.bucketId);
+ }
+ });
+ long intervalMillis = managedStateImpl.getCheckStateSizeInterval().getMillis();
+ memoryFreeService.scheduleAtFixedRate(this, intervalMillis, intervalMillis);
+ }
+
+ void bucketAccessed(long bucketId)
+ {
+ BucketIdTimeWrapper idTimeWrapper = bucketAccessTimes.get(bucketId);
+ if (idTimeWrapper != null) {
+ bucketHeap.remove(idTimeWrapper);
+ } else {
+ idTimeWrapper = new BucketIdTimeWrapper(bucketId);
+ }
+ idTimeWrapper.setLastAccessedTime(System.currentTimeMillis());
+ bucketHeap.add(idTimeWrapper);
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override
+ public void run()
+ {
+ synchronized (managedStateImpl.commitLock) {
+ //freeing of state needs to be stopped during commit as commit results in transferring data to a state which
+ // can be freed up as well.
+ long bytesSum = 0;
+ for (Bucket bucket : managedStateImpl.buckets) {
+ if (bucket != null) {
+ bytesSum += bucket.getSizeInBytes();
+ }
+ }
+
+ if (bytesSum > managedStateImpl.getMaxMemorySize()) {
+ Duration duration = managedStateImpl.getDurationPreventingFreeingSpace();
+ long durationMillis = 0;
+ if (duration != null) {
+ durationMillis = duration.getMillis();
+ }
+
+ BucketIdTimeWrapper idTimeWrapper;
+ while (bytesSum > managedStateImpl.getMaxMemorySize() && bucketHeap.size() > 0 &&
+ null != (idTimeWrapper = bucketHeap.first())) {
+ //trigger buckets to free space
+
+ if (System.currentTimeMillis() - idTimeWrapper.getLastAccessedTime() < durationMillis) {
+ //if the least recently used bucket cannot free up space because it was accessed within the
+ //specified duration then subsequent buckets cannot free space as well because this heap is ordered by time.
+ break;
+ }
+ long bucketId = idTimeWrapper.bucketId;
+ Bucket bucket = managedStateImpl.getBucket(bucketId);
+ if (bucket != null) {
+
+ synchronized (bucket) {
+ long sizeFreed;
+ try {
+ sizeFreed = bucket.freeMemory();
+ LOG.debug("size freed {} {}", bucketId, sizeFreed);
+ } catch (IOException e) {
+ managedStateImpl.throwable.set(e);
+ throw new RuntimeException("freeing " + bucketId, e);
+ }
+ bytesSum -= sizeFreed;
+ }
+ bucketHeap.remove(idTimeWrapper);
+ bucketAccessTimes.remove(bucketId);
+ }
+ }
+ }
+ }
+ }
+
+ void teardown()
+ {
+ memoryFreeService.cancel();
+ }
+
+ /**
+ * Wrapper class for bucket id and the last time the bucket was accessed.
+ */
+ private static class BucketIdTimeWrapper
+ {
+ private final long bucketId;
+ private long lastAccessedTime;
+
+ BucketIdTimeWrapper(long bucketId)
+ {
+ this.bucketId = bucketId;
+ }
+
+ private synchronized long getLastAccessedTime()
+ {
+ return lastAccessedTime;
+ }
+
+ private synchronized void setLastAccessedTime(long lastAccessedTime)
+ {
+ this.lastAccessedTime = lastAccessedTime;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BucketIdTimeWrapper)) {
+ return false;
+ }
+
+ BucketIdTimeWrapper that = (BucketIdTimeWrapper)o;
+ //Note: the comparator used with bucket heap imposes orderings that are inconsistent with equals
+ return bucketId == that.bucketId;
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return (int)(bucketId ^ (bucketId >>> 32));
+ }
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(StateTracker.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
new file mode 100644
index 0000000..745353b
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java
@@ -0,0 +1,242 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.appdata.query.WindowBoundedService;
+
+/**
+ * Keeps track of time buckets.<br/>
+ *
+ * The data of a bucket is further divided into time-buckets. This component controls the length of time buckets,
+ * which time-bucket an event falls into and sliding the time boundaries.
+ * <p/>
+ *
+ * The configuration {@link #expireBefore}, {@link #bucketSpan} and {@link #referenceInstant} (default time: system
+ * time during initialization of TimeBucketAssigner) are used to calculate number of time-buckets.<br/>
+ * For eg. if <code>expireBefore = 1 hour</code>, <code>bucketSpan = 30 minutes</code> and
+ * <code>rererenceInstant = current-time</code>, then <code>
+ * numBuckets = 60 minutes/ 30 minutes = 2 </code>.<br/>
+ *
+ * These properties once configured shouldn't be changed because that will result in different time-buckets
+ * for the same (key,time) pair after a failure.
+ * <p/>
+ *
+ * The time boundaries- start and end, periodically move by span of a single time-bucket. Any event with time < start
+ * is expired. These boundaries slide between application window by the expiry task asynchronously.<br/>
+ * The boundaries move only between an application window to ensure consistency of a checkpoint. Checkpoint will happen
+ * at application window boundaries so if we do not restrict moving start and end within an app window boundary, it may
+ * happen that old value of 'start' is saved with the new value of 'end'.
+ *
+ * <p/>
+ *
+ * The boundaries can also be moved by {@link #getTimeBucketFor(long)}. The time which is passed as an argument to this
+ * method can be ahead of <code>end</code>. This means that the corresponding event is a future event
+ * (wrt TimeBucketAssigner) and cannot be ignored. Therefore it is accounted by sliding boundaries further.
+ */
+public class TimeBucketAssigner implements ManagedStateComponent
+{
+ @NotNull
+ private Instant referenceInstant = new Instant();
+
+ @NotNull
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration expireBefore = Duration.standardDays(2);
+
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration bucketSpan;
+
+ private long bucketSpanMillis;
+
+ private long start;
+ private long end;
+ private int numBuckets;
+ private transient long fixedStart;
+ private transient long lowestTimeBucket;
+
+ private boolean initialized;
+
+ private transient WindowBoundedService windowBoundedService;
+
+ private transient PurgeListener purgeListener = null;
+
+ private final transient Runnable expiryTask = new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ synchronized (lock) {
+ start += bucketSpanMillis;
+ end += bucketSpanMillis;
+ if (purgeListener != null) {
+ purgeListener.purgeTimeBucketsLessThanEqualTo(lowestTimeBucket++);
+ }
+ }
+ }
+ };
+
+ private final transient Object lock = new Object();
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ Context.OperatorContext context = managedStateContext.getOperatorContext();
+ fixedStart = referenceInstant.getMillis() - expireBefore.getMillis();
+
+ if (!initialized) {
+ if (bucketSpan == null) {
+ bucketSpan = Duration.millis(context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS));
+ }
+ start = fixedStart;
+ bucketSpanMillis = bucketSpan.getMillis();
+ numBuckets = (int)((expireBefore.getMillis() + bucketSpanMillis - 1) / bucketSpanMillis);
+ end = start + (numBuckets * bucketSpanMillis);
+
+ initialized = true;
+ }
+ lowestTimeBucket = (start - fixedStart) / bucketSpanMillis;
+ windowBoundedService = new WindowBoundedService(bucketSpanMillis, expiryTask);
+ windowBoundedService.setup(context);
+ }
+
+ public void beginWindow(long windowId)
+ {
+ windowBoundedService.beginWindow(windowId);
+ }
+
+ public void endWindow()
+ {
+ windowBoundedService.endWindow();
+ }
+
+ /**
+ * Get the bucket key for the long value.
+ *
+ * @param value value from which bucket key is derived.
+ * @return -1 if value is already expired; bucket key otherwise.
+ */
+ public long getTimeBucketFor(long value)
+ {
+ synchronized (lock) {
+ if (value < start) {
+ return -1;
+ }
+ long diffFromStart = value - fixedStart;
+ long key = diffFromStart / bucketSpanMillis;
+ if (value > end) {
+ long move = ((value - end) / bucketSpanMillis + 1) * bucketSpanMillis;
+ start += move;
+ end += move;
+ }
+ return key;
+ }
+ }
+
+ public void setPurgeListener(@NotNull PurgeListener purgeListener)
+ {
+ this.purgeListener = Preconditions.checkNotNull(purgeListener, "purge listener");
+ }
+
+ @Override
+ public void teardown()
+ {
+ windowBoundedService.teardown();
+ }
+
+ /**
+ * @return number of buckets.
+ */
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * @return reference instant
+ */
+ public Instant getReferenceInstant()
+ {
+ return referenceInstant;
+ }
+
+ /**
+ * Sets the reference instant (by default the system time when the streaming app is created).
+ * This instant with {@link #expireBefore} is used to calculate the {@link #start} and {@link #end}.
+ *
+ * @param referenceInstant
+ */
+ public void setReferenceInstant(Instant referenceInstant)
+ {
+ this.referenceInstant = referenceInstant;
+ }
+
+ /**
+ * @return duration before which the data is expired.
+ */
+ public Duration getExpireBefore()
+ {
+ return expireBefore;
+ }
+
+ /**
+ * Sets the duration which denotes expiry. Any event with time before this duration is considered to be expired.
+ * @param expireBefore duration
+ */
+ public void setExpireBefore(Duration expireBefore)
+ {
+ this.expireBefore = expireBefore;
+ }
+
+ /**
+ * @return time-bucket span
+ */
+ public Duration getBucketSpan()
+ {
+ return bucketSpan;
+ }
+
+ /**
+ * Sets the length of a time bucket.
+ * @param bucketSpan length of time bucket
+ */
+ public void setBucketSpan(Duration bucketSpan)
+ {
+ this.bucketSpan = bucketSpan;
+ }
+
+ /**
+ * The listener is informed when the time slides and time buckets which are older than the smallest time bucket
+ * (changed because of time slide) can be purged.
+ */
+ public interface PurgeListener
+ {
+ void purgeTimeBucketsLessThanEqualTo(long timeBucket);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
new file mode 100644
index 0000000..99c8dd1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state.managed;
+
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
new file mode 100644
index 0000000..e75d867
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+@InterfaceStability.Unstable
+package org.apache.apex.malhar.lib.state;
+
+import org.apache.hadoop.classification.InterfaceStability;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
new file mode 100644
index 0000000..51e9a13
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class BucketsFileSystemTest
+{
+ class TestMeta extends TestWatcher
+ {
+ BucketsFileSystem bucketsFileSystem;
+ String applicationPath;
+ MockManagedStateContext managedStateContext;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+
+ managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(7));
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+ managedStateContext.getFileAccess().init();
+
+ bucketsFileSystem = new BucketsFileSystem();
+
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testTransferBucket() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 1);
+ testMeta.bucketsFileSystem.teardown();
+ }
+
+ @Test
+ public void testTransferOfExistingBucket() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, unsavedBucket0);
+
+ Map<Slice, Bucket.BucketedValue> more = ManagedStateTestUtils.getTestBucketData(50, 100);
+ testMeta.bucketsFileSystem.writeBucketData(10, 0, more);
+
+ unsavedBucket0.putAll(more);
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 0, unsavedBucket0, 2);
+ testMeta.bucketsFileSystem.teardown();
+ }
+
+ @Test
+ public void testUpdateBucketMetaDataFile() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+ testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+ BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+ Assert.assertNotNull(immutableTbm);
+ Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId());
+ Assert.assertEquals("size in bytes", 100, immutableTbm.getSizeInBytes());
+ Assert.assertEquals("first key", "1", immutableTbm.getFirstKey().stringValue());
+ testMeta.bucketsFileSystem.teardown();
+ }
+
+ @Test
+ public void testGetTimeBucketMeta() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+ Assert.assertNull("bucket meta", bucketMeta);
+
+ testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1);
+ Assert.assertNotNull("bucket meta not null", bucketMeta);
+ testMeta.bucketsFileSystem.teardown();
+ }
+
+ @Test
+ public void testGetAllTimeBucketMeta() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1);
+ tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes()));
+
+ BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2);
+ tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes()));
+
+ testMeta.bucketsFileSystem.updateBucketMetaFile(1);
+ TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+ testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+ Iterator<BucketsFileSystem.TimeBucketMeta> iterator = timeBucketMetas.iterator();
+ int i = 2;
+ while (iterator.hasNext()) {
+ BucketsFileSystem.TimeBucketMeta tbm = iterator.next();
+ Assert.assertEquals("time bucket " + i, i, tbm.getTimeBucketId());
+ i--;
+ }
+ testMeta.bucketsFileSystem.teardown();
+ }
+
+ @Test
+ public void testInvalidateTimeBucket() throws IOException
+ {
+ testMeta.bucketsFileSystem.setup(testMeta.managedStateContext);
+ testGetAllTimeBucketMeta();
+ testMeta.bucketsFileSystem.invalidateTimeBucket(1, 1);
+ BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1,1);
+ Assert.assertNull("deleted tbm", immutableTbm);
+
+ TreeSet<BucketsFileSystem.TimeBucketMeta> timeBucketMetas =
+ testMeta.bucketsFileSystem.getAllTimeBuckets(1);
+
+ Assert.assertEquals("only 1 tbm", 1, timeBucketMetas.size());
+ immutableTbm = timeBucketMetas.iterator().next();
+
+ Assert.assertEquals("tbm 2", 2, immutableTbm.getTimeBucketId());
+ testMeta.bucketsFileSystem.teardown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
new file mode 100644
index 0000000..cb8a97f
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/DefaultBucketTest.java
@@ -0,0 +1,203 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class DefaultBucketTest
+{
+
+ class TestMeta extends TestWatcher
+ {
+ Bucket.DefaultBucket defaultBucket;
+ String applicationPath;
+ MockManagedStateContext managedStateContext;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ managedStateContext = new MockManagedStateContext(ManagedStateTestUtils.getOperatorContext(9));
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+ managedStateContext.getFileAccess().init();
+
+ defaultBucket = new Bucket.DefaultBucket(1);
+ managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ managedStateContext.getBucketsFileSystem().teardown();
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testPut()
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.defaultBucket.put(one, 1, one);
+
+ Slice value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.MEMORY);
+ Assert.assertEquals("value one", one, value);
+
+ value = testMeta.defaultBucket.get(one, 1, Bucket.ReadSource.READERS);
+ Assert.assertNull("value not present", value);
+
+ Assert.assertEquals("size of bucket", one.length * 2 + 64, testMeta.defaultBucket.getSizeInBytes());
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testGetFromReader() throws IOException
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+ testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+ Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.READERS);
+ Assert.assertEquals("value one", one, value);
+
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testGetFromSpecificTimeBucket() throws IOException
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, 100);
+ testMeta.managedStateContext.getBucketsFileSystem().writeBucketData(1, 1, unsavedBucket0);
+
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), 1, unsavedBucket0, 1);
+
+ Slice value = testMeta.defaultBucket.get(one, 101, Bucket.ReadSource.READERS);
+ Assert.assertEquals("value one", one, value);
+
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testCheckpointed()
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testPut();
+ Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+ Assert.assertEquals("size", 1, unsaved.size());
+
+ Map.Entry<Slice, Bucket.BucketedValue> entry = unsaved.entrySet().iterator().next();
+ Assert.assertEquals("key", one, entry.getKey());
+ Assert.assertEquals("value", one, entry.getValue().getValue());
+ Assert.assertEquals("time bucket", 1, entry.getValue().getTimeBucket());
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testCommitted()
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testCheckpointed();
+ testMeta.defaultBucket.committed(10);
+ Slice value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+ Assert.assertEquals("value one", one, value);
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testCommittedWithOpenReader() throws IOException
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ testGetFromReader();
+ Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+ Assert.assertTrue("reader open", readers.containsKey(101L));
+
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+
+ testMeta.defaultBucket.put(two, 101, two);
+ Map<Slice, Bucket.BucketedValue> unsaved = testMeta.defaultBucket.checkpoint(10);
+ Assert.assertEquals("size", 1, unsaved.size());
+ testMeta.defaultBucket.committed(10);
+
+ Slice value = testMeta.defaultBucket.get(two, -1, Bucket.ReadSource.MEMORY);
+ Assert.assertEquals("value two", two, value);
+
+ value = testMeta.defaultBucket.get(one, -1, Bucket.ReadSource.MEMORY);
+ Assert.assertEquals("value one", one, value);
+
+ Assert.assertTrue("reader closed", !readers.containsKey(101L));
+ testMeta.defaultBucket.teardown();
+ }
+
+ @Test
+ public void testTeardown() throws IOException
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ testGetFromReader();
+ Map<Long, FileAccess.FileReader> readers = testMeta.defaultBucket.getReaders();
+ Assert.assertTrue("reader open", readers.containsKey(101L));
+
+ testMeta.defaultBucket.teardown();
+ Assert.assertTrue("reader closed", readers.containsKey(101L));
+ }
+
+ @Test
+ public void testFreeMemory() throws IOException
+ {
+ testMeta.defaultBucket.setup(testMeta.managedStateContext);
+ testGetFromReader();
+ long initSize = testMeta.defaultBucket.getSizeInBytes();
+
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ testMeta.defaultBucket.put(two, 101, two);
+
+ Assert.assertEquals("size", initSize + (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+
+ long sizeFreed = testMeta.defaultBucket.freeMemory();
+ Assert.assertEquals("size freed", initSize, sizeFreed);
+ Assert.assertEquals("existing size", (two.length * 2 + 64), testMeta.defaultBucket.getSizeInBytes());
+ testMeta.defaultBucket.teardown();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
new file mode 100644
index 0000000..ca64693
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManagerTest.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+
+import javax.validation.constraints.NotNull;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class IncrementalCheckpointManagerTest
+{
+ class TestMeta extends TestWatcher
+ {
+ IncrementalCheckpointManager checkpointManager;
+ String applicationPath;
+ int operatorId = 1;
+ MockManagedStateContext managedStateContext;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+ Context.OperatorContext operatorContext = ManagedStateTestUtils.getOperatorContext(operatorId, applicationPath);
+ managedStateContext = new MockManagedStateContext(operatorContext);
+
+ ((FileAccessFSImpl)managedStateContext.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+ managedStateContext.getFileAccess().init();
+
+ checkpointManager = new IncrementalCheckpointManager();
+
+ managedStateContext.getTimeBucketAssigner().setup(managedStateContext);
+ managedStateContext.getBucketsFileSystem().setup(managedStateContext);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ managedStateContext.getTimeBucketAssigner().teardown();
+ managedStateContext.getBucketsFileSystem().teardown();
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ IncrementalCheckpointManager deserialized = KryoCloneUtils.cloneObject(testMeta.checkpointManager);
+ Assert.assertNotNull("state window data manager", deserialized);
+ }
+
+ @Test
+ public void testSave() throws IOException
+ {
+ testMeta.checkpointManager.setup(testMeta.managedStateContext);
+ Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+ testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+ testMeta.checkpointManager.teardown();
+
+ testMeta.checkpointManager = new IncrementalCheckpointManager();
+ testMeta.checkpointManager.setup(testMeta.managedStateContext);
+ @SuppressWarnings("unchecked")
+ Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5After = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+ testMeta.checkpointManager.load(testMeta.operatorId, 10);
+
+ Assert.assertEquals("saved", buckets5, buckets5After);
+ testMeta.checkpointManager.teardown();
+ }
+
+ @Test
+ public void testTransferWindowFiles() throws IOException, InterruptedException
+ {
+ testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+ Map<Long, Map<Slice, Bucket.BucketedValue>> buckets5 = ManagedStateTestUtils.getTestData(0, 5, 0);
+ testMeta.checkpointManager.save(buckets5, testMeta.operatorId, 10, false);
+ //Need to synchronously call transfer window files so shutting down the other thread.
+ testMeta.checkpointManager.teardown();
+ Thread.sleep(500);
+
+ testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+ testMeta.checkpointManager.transferWindowFiles();
+
+ for (int i = 0; i < 5; i++) {
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i,
+ buckets5.get((long)i), 1);
+ }
+ }
+
+ @Test
+ public void testCommitted() throws IOException, InterruptedException
+ {
+ CountDownLatch latch = new CountDownLatch(5);
+ MockBucketsFileSystem mockBucketsFileSystem = new MockBucketsFileSystem(latch);
+
+ testMeta.managedStateContext.setBucketsFileSystem(mockBucketsFileSystem);
+
+ mockBucketsFileSystem.setup(testMeta.managedStateContext);
+ testMeta.checkpointManager.setup(testMeta.managedStateContext);
+
+ Map<Long, Map<Slice, Bucket.BucketedValue>> data = ManagedStateTestUtils.getTestData(0, 5, 0);
+ testMeta.checkpointManager.save(data, testMeta.operatorId, 10, false);
+ testMeta.checkpointManager.committed(testMeta.operatorId, 10);
+ latch.await();
+ testMeta.checkpointManager.teardown();
+ Thread.sleep(500);
+
+ for (int i = 0; i < 5; i++) {
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedStateContext.getFileAccess(), i, data.get((long)i), 1);
+ }
+ }
+
+ @Test
+ public void testPurge() throws IOException, InterruptedException
+ {
+ FileSystem fileSystem = FileSystem.newInstance(new Configuration());
+
+ testTransferWindowFiles();
+ RemoteIterator<LocatedFileStatus> iterator = fileSystem.listLocatedStatus(
+ new Path(testMeta.applicationPath + "/bucket_data"));
+ Assert.assertTrue(iterator.hasNext());
+
+ testMeta.managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(200);
+
+ iterator = fileSystem.listLocatedStatus(new Path(testMeta.applicationPath + "/bucket_data"));
+ if (iterator.hasNext()) {
+ Assert.fail("All buckets should be deleted");
+ }
+ }
+
+ static class MockBucketsFileSystem extends BucketsFileSystem
+ {
+ private final transient CountDownLatch latch;
+
+ public MockBucketsFileSystem(@NotNull CountDownLatch latch)
+ {
+ super();
+ this.latch = Preconditions.checkNotNull(latch);
+ }
+
+ @Override
+ protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data)
+ throws IOException
+ {
+ super.writeBucketData(windowId, bucketId, data);
+ if (windowId == 10) {
+ latch.countDown();
+ }
+ }
+ }
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManagerTest.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
new file mode 100644
index 0000000..f86b5d3
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java
@@ -0,0 +1,182 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateImplTest
+{
+
+ class TestMeta extends TestWatcher
+ {
+ ManagedStateImpl managedState;
+ Context.OperatorContext operatorContext;
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ managedState = new ManagedStateImpl();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+ operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ ManagedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+ Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+ }
+
+ @Test
+ public void testSimplePutGet()
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ testMeta.managedState.beginWindow(System.currentTimeMillis());
+ testMeta.managedState.put(0, one, one);
+ Slice value = testMeta.managedState.getSync(0, one);
+ testMeta.managedState.endWindow();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncGetFromFlash() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ testMeta.managedState.beginWindow(System.currentTimeMillis());
+ testMeta.managedState.put(0, one, one);
+ Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+ Slice value = valFuture.get();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testIncrementalCheckpoint()
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(time);
+ testMeta.managedState.put(0, one, one);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(time);
+
+ Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+ Assert.assertEquals("value of one", one, defaultBucket.getCheckpointedData().get(time).get(one).getValue());
+
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ testMeta.managedState.beginWindow(time + 1);
+ testMeta.managedState.put(0, two, two);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(time + 1);
+
+ Assert.assertEquals("value of two", two, defaultBucket.getCheckpointedData().get(time + 1).get(two).getValue());
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncGetFromCheckpoint() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(time);
+ testMeta.managedState.put(0, one, one);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(time);
+
+ Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+ Assert.assertEquals("value of one", one, valFuture.get());
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testCommitted()
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ commitHelper(one, two);
+ Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+ Assert.assertEquals("value of one", one, defaultBucket.getCommittedData().get(one).getValue());
+
+ Assert.assertNull("value of two", defaultBucket.getCommittedData().get(two));
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncGetFromCommitted() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ commitHelper(one, two);
+ Future<Slice> valFuture = testMeta.managedState.getAsync(0, one);
+ Assert.assertEquals("value of one", one, valFuture.get());
+ }
+
+ private void commitHelper(Slice one, Slice two)
+ {
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(time);
+ testMeta.managedState.put(0, one, one);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(time);
+
+ testMeta.managedState.beginWindow(time + 1);
+ testMeta.managedState.put(0, two, two);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(time + 1);
+
+ testMeta.managedState.committed(time);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
new file mode 100644
index 0000000..f2251bd
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateTestUtils.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+import java.util.TreeMap;
+
+import javax.annotation.Nullable;
+
+import org.junit.Assert;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedStateTestUtils
+{
+
+ static void cleanTargetDir(Description description)
+ {
+ try {
+ File out = new File("target/" + description.getClassName());
+ if (out.exists()) {
+ FileUtils.deleteDirectory(out);
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ static void transferBucketHelper(FileAccess fileAccess, long bucketId, Map<Slice, Bucket.BucketedValue> unsavedBucket,
+ int keysPerTimeBucket) throws IOException
+ {
+ RemoteIterator<LocatedFileStatus> iterator = fileAccess.listFiles(bucketId);
+ TreeMap<Slice, Slice> fromDisk = Maps.newTreeMap(new SliceComparator());
+ int size = 0;
+ while (iterator.hasNext()) {
+ LocatedFileStatus fileStatus = iterator.next();
+
+ String timeBucketStr = fileStatus.getPath().getName();
+ if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+ //ignoring meta file
+ continue;
+ }
+
+ LOG.debug("bucket {} time-bucket {}", bucketId, timeBucketStr);
+
+ FileAccess.FileReader reader = fileAccess.getReader(bucketId, timeBucketStr);
+
+ reader.readFully(fromDisk);
+ size += keysPerTimeBucket;
+ Assert.assertEquals("size of bucket " + bucketId, size, fromDisk.size());
+ }
+
+ Assert.assertEquals("size of bucket " + bucketId, unsavedBucket.size(), fromDisk.size());
+
+ Map<Slice, Slice> testBucket = Maps.transformValues(unsavedBucket, new Function<Bucket.BucketedValue, Slice>()
+ {
+ @Override
+ public Slice apply(@Nullable Bucket.BucketedValue input)
+ {
+ assert input != null;
+ return input.getValue();
+ }
+ });
+ Assert.assertEquals("data of bucket" + bucketId, testBucket, fromDisk);
+ }
+
+ static Map<Long, Map<Slice, Bucket.BucketedValue>> getTestData(int startBucket, int endBucket, int keyStart)
+ {
+ Map<Long, Map<Slice, Bucket.BucketedValue>> data = Maps.newHashMap();
+ for (int i = startBucket; i < endBucket; i++) {
+ Map<Slice, Bucket.BucketedValue> bucketData = getTestBucketData(keyStart, 100);
+ data.put((long)i, bucketData);
+ }
+ return data;
+ }
+
+ static Map<Slice, Bucket.BucketedValue> getTestBucketData(int keyStart, long timeBucketStart)
+ {
+ Map<Slice, Bucket.BucketedValue> bucketData = Maps.newHashMap();
+ for (int j = 0; j < 5; j++) {
+ Slice keyVal = new Slice(Integer.toString(keyStart).getBytes());
+ bucketData.put(keyVal, new Bucket.BucketedValue(timeBucketStart + j, keyVal));
+ keyStart++;
+ }
+ return bucketData;
+ }
+
+ static Context.OperatorContext getOperatorContext(int operatorId, String applicationPath)
+ {
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+ }
+
+ static Context.OperatorContext getOperatorContext(int operatorId)
+ {
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ return new OperatorContextTestHelper.TestIdOperatorContext(operatorId, attributes);
+ }
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(ManagedStateTestUtils.class);
+
+ static Slice getSliceFor(String x)
+ {
+ return new Slice(x.getBytes());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
new file mode 100644
index 0000000..ac4db39
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeStateImplTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeStateImplTest
+{
+ class TestMeta extends TestWatcher
+ {
+ ManagedTimeStateImpl managedState;
+ Context.OperatorContext operatorContext;
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ managedState = new ManagedTimeStateImpl();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+ operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ ManagedTimeStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+ Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+ }
+
+ @Test
+ public void testAsyncGetFromReaders() throws IOException, ExecutionException, InterruptedException
+ {
+ Slice zero = ManagedStateTestUtils.getSliceFor("0");
+ long time = System.currentTimeMillis();
+
+ testMeta.managedState.setup(testMeta.operatorContext);
+
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, time);
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), 0, unsavedBucket0, 1);
+
+ Future<Slice> valFuture = testMeta.managedState.getAsync(0, zero);
+
+ Assert.assertEquals("value of zero", zero, valFuture.get());
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testPutGetWithTime()
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(0);
+ testMeta.managedState.put(0, time, one, one);
+ Slice value = testMeta.managedState.getSync(0, time, one);
+ testMeta.managedState.endWindow();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncGetWithTime() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(0);
+ testMeta.managedState.put(0, time, one, one);
+ Future<Slice> valFuture = testMeta.managedState.getAsync(0, time, one);
+ Slice value = valFuture.get();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testRecovery() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(0);
+ testMeta.managedState.put(0, time, one, one);
+ testMeta.managedState.endWindow();
+ testMeta.managedState.beforeCheckpoint(0);
+
+ testMeta.managedState.teardown();
+
+ //there is a failure and the operator is re-deployed.
+ testMeta.managedState.setStateTracker(new StateTracker());
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, testMeta.applicationPath);
+ attributes.put(Context.OperatorContext.ACTIVATION_WINDOW_ID, 0L);
+ Context.OperatorContext operatorContext = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ testMeta.managedState.setup(operatorContext);
+
+ Bucket.DefaultBucket defaultBucket = (Bucket.DefaultBucket)testMeta.managedState.getBucket(0);
+ Assert.assertEquals("value of one", one, defaultBucket.get(one, time, Bucket.ReadSource.MEMORY));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
new file mode 100644
index 0000000..523a10a
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedTimeUnifiedStateImplTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.lib.util.KryoCloneUtils;
+import com.datatorrent.netlet.util.Slice;
+
+public class ManagedTimeUnifiedStateImplTest
+{
+ class TestMeta extends TestWatcher
+ {
+ ManagedTimeUnifiedStateImpl managedState;
+ Context.OperatorContext operatorContext;
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ managedState = new ManagedTimeUnifiedStateImpl();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+ operatorContext = ManagedStateTestUtils.getOperatorContext(9, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testSerde() throws IOException
+ {
+ ManagedTimeUnifiedStateImpl deserialized = KryoCloneUtils.cloneObject(testMeta.managedState);
+ Assert.assertEquals("num buckets", deserialized.getNumBuckets(), testMeta.managedState.getNumBuckets());
+ }
+
+ @Test
+ public void testSimplePutGet()
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(0);
+ testMeta.managedState.put(time, one, one);
+ Slice value = testMeta.managedState.getSync(time, one);
+ testMeta.managedState.endWindow();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncGet() throws ExecutionException, InterruptedException
+ {
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.setup(testMeta.operatorContext);
+ long time = System.currentTimeMillis();
+ testMeta.managedState.beginWindow(0);
+ testMeta.managedState.put(time, one, one);
+ Future<Slice> valFuture = testMeta.managedState.getAsync(time, one);
+ Slice value = valFuture.get();
+
+ Assert.assertEquals("value of one", one, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+ {
+ Slice zero = ManagedStateTestUtils.getSliceFor("0");
+ long time = System.currentTimeMillis();
+
+ testMeta.managedState.setup(testMeta.operatorContext);
+
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+ //write data to disk explicitly
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+ unsavedBucket0, 1);
+
+ Slice value = testMeta.managedState.getSync(time, zero);
+
+ Assert.assertEquals("value of zero", zero, value);
+ testMeta.managedState.teardown();
+ }
+
+ @Test
+ public void testAsyncSyncGetFromFiles() throws IOException, ExecutionException, InterruptedException
+ {
+ Slice zero = ManagedStateTestUtils.getSliceFor("0");
+ long time = System.currentTimeMillis();
+
+ testMeta.managedState.setup(testMeta.operatorContext);
+
+ long timeBucket = testMeta.managedState.getTimeBucketAssigner().getTimeBucketFor(time);
+ Map<Slice, Bucket.BucketedValue> unsavedBucket0 = ManagedStateTestUtils.getTestBucketData(0, timeBucket);
+
+ //write data to disk explicitly
+ testMeta.managedState.bucketsFileSystem.writeBucketData(time, 0, unsavedBucket0);
+ ManagedStateTestUtils.transferBucketHelper(testMeta.managedState.getFileAccess(), testMeta.operatorContext.getId(),
+ unsavedBucket0, 1);
+
+ Future<Slice> valFuture = testMeta.managedState.getAsync(time, zero);
+
+ Assert.assertEquals("value of zero", zero, valFuture.get());
+ testMeta.managedState.teardown();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
new file mode 100644
index 0000000..8ae4db7
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/MockManagedStateContext.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+class MockManagedStateContext implements ManagedStateContext
+{
+ private TFileImpl.DTFileImpl fileAccess = new TFileImpl.DTFileImpl();
+ private Comparator<Slice> keyComparator = new SliceComparator();
+ private BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+ private TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+ private final Context.OperatorContext operatorContext;
+
+ public MockManagedStateContext(Context.OperatorContext operatorContext)
+ {
+ this.operatorContext = operatorContext;
+ }
+
+ @Override
+ public FileAccess getFileAccess()
+ {
+ return fileAccess;
+ }
+
+ @Override
+ public Comparator<Slice> getKeyComparator()
+ {
+ return keyComparator;
+ }
+
+ public BucketsFileSystem getBucketsFileSystem()
+ {
+ return bucketsFileSystem;
+ }
+
+ @Override
+ public TimeBucketAssigner getTimeBucketAssigner()
+ {
+ return timeBucketAssigner;
+ }
+
+ @Override
+ public Context.OperatorContext getOperatorContext()
+ {
+ return operatorContext;
+ }
+
+ void setFileAccess(TFileImpl.DTFileImpl fileAccess)
+ {
+ this.fileAccess = fileAccess;
+ }
+
+ void setKeyComparator(Comparator<Slice> keyComparator)
+ {
+ this.keyComparator = keyComparator;
+ }
+
+ void setBucketsFileSystem(BucketsFileSystem bucketsFileSystem)
+ {
+ this.bucketsFileSystem = bucketsFileSystem;
+ }
+
+ void setTimeBucketAssigner(TimeBucketAssigner timeBucketAssigner)
+ {
+ this.timeBucketAssigner = timeBucketAssigner;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
new file mode 100644
index 0000000..8a3e521
--- /dev/null
+++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/StateTrackerTest.java
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+
+import org.joda.time.Duration;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import com.google.common.collect.Lists;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccessFSImpl;
+import com.datatorrent.netlet.util.Slice;
+
+public class StateTrackerTest
+{
+ static class TestMeta extends TestWatcher
+ {
+ MockManagedStateImpl managedState;
+ Context.OperatorContext operatorContext;
+ String applicationPath;
+
+ @Override
+ protected void starting(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ managedState = new MockManagedStateImpl();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+ ((FileAccessFSImpl)managedState.getFileAccess()).setBasePath(applicationPath + "/" + "bucket_data");
+
+ managedState.setNumBuckets(2);
+ managedState.setMaxMemorySize(100);
+
+ operatorContext = ManagedStateTestUtils.getOperatorContext(1, applicationPath);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ ManagedStateTestUtils.cleanTargetDir(description);
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testEviction() throws InterruptedException
+ {
+ testMeta.managedState.latch = new CountDownLatch(1);
+ testMeta.managedState.setup(testMeta.operatorContext);
+
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.beginWindow(System.currentTimeMillis());
+ testMeta.managedState.put(1, one, one);
+ testMeta.managedState.endWindow();
+
+ testMeta.managedState.latch.await();
+ testMeta.managedState.teardown();
+ Assert.assertEquals("freed bucket", Lists.newArrayList(1L), testMeta.managedState.freedBuckets);
+ }
+
+ @Test
+ public void testMultipleEvictions() throws InterruptedException
+ {
+ testMeta.managedState.latch = new CountDownLatch(2);
+ testMeta.managedState.setup(testMeta.operatorContext);
+
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.beginWindow(System.currentTimeMillis());
+ testMeta.managedState.put(1, one, one);
+
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ testMeta.managedState.put(2, two, two);
+ testMeta.managedState.endWindow();
+
+ testMeta.managedState.latch.await();
+ testMeta.managedState.teardown();
+ Assert.assertEquals("freed bucket", Lists.newArrayList(1L, 2L), testMeta.managedState.freedBuckets);
+ }
+
+ @Test
+ public void testBucketPrevention() throws InterruptedException
+ {
+ testMeta.managedState.setDurationPreventingFreeingSpace(Duration.standardDays(2));
+ testMeta.managedState.setStateTracker(new MockStateTracker());
+ testMeta.managedState.latch = new CountDownLatch(1);
+
+ testMeta.managedState.setup(testMeta.operatorContext);
+ Slice one = ManagedStateTestUtils.getSliceFor("1");
+ testMeta.managedState.beginWindow(System.currentTimeMillis());
+ testMeta.managedState.put(1, one, one);
+
+ Slice two = ManagedStateTestUtils.getSliceFor("2");
+ testMeta.managedState.put(2, two, two);
+ testMeta.managedState.endWindow();
+
+ testMeta.managedState.latch.await();
+ testMeta.managedState.teardown();
+ Assert.assertEquals("no buckets triggered", 0, testMeta.managedState.freedBuckets.size());
+ }
+
+ private static class MockManagedStateImpl extends ManagedStateImpl
+ {
+ CountDownLatch latch;
+ List<Long> freedBuckets = Lists.newArrayList();
+
+ @Override
+ protected Bucket newBucket(long bucketId)
+ {
+ return new MockDefaultBucket(bucketId);
+ }
+ }
+
+ private static class MockDefaultBucket extends Bucket.DefaultBucket
+ {
+
+ protected MockDefaultBucket(long bucketId)
+ {
+ super(bucketId);
+ }
+
+ @Override
+ public long freeMemory() throws IOException
+ {
+ long freedBytes = super.freeMemory();
+ ((MockManagedStateImpl)managedStateContext).freedBuckets.add(getBucketId());
+ ((MockManagedStateImpl)managedStateContext).latch.countDown();
+ return freedBytes;
+ }
+
+ @Override
+ public long getSizeInBytes()
+ {
+ return 600;
+ }
+ }
+
+ private static class MockStateTracker extends StateTracker
+ {
+
+ @Override
+ public void run()
+ {
+ super.run();
+ ((MockManagedStateImpl)managedStateImpl).latch.countDown();
+ }
+ }
+
+}