You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/03/25 14:18:25 UTC
[3/4] incubator-apex-malhar git commit: APEXMALHAR-1897 added managed
state
APEXMALHAR-1897 added managed state
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a8fbcac6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a8fbcac6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a8fbcac6
Branch: refs/heads/master
Commit: a8fbcac6236e4130cef1e83830e944c4788bbca4
Parents: 5373a3c
Author: Chandni Singh <cs...@apache.org>
Authored: Sun Dec 13 03:13:08 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Fri Mar 25 00:04:04 2016 -0700
----------------------------------------------------------------------
.../datatorrent/lib/fileaccess/FileAccess.java | 3 +
.../lib/fileaccess/FileAccessFSImpl.java | 11 +-
.../apex/malhar/lib/state/BucketedState.java | 72 +++
.../lib/state/TimeSlicedBucketedState.java | 104 ++++
.../state/managed/AbstractManagedStateImpl.java | 583 +++++++++++++++++++
.../apex/malhar/lib/state/managed/Bucket.java | 525 +++++++++++++++++
.../lib/state/managed/BucketsFileSystem.java | 566 ++++++++++++++++++
.../managed/IncrementalCheckpointManager.java | 213 +++++++
.../malhar/lib/state/managed/ManagedState.java | 32 +
.../state/managed/ManagedStateComponent.java | 36 ++
.../lib/state/managed/ManagedStateContext.java | 38 ++
.../lib/state/managed/ManagedStateImpl.java | 103 ++++
.../lib/state/managed/ManagedTimeStateImpl.java | 103 ++++
.../managed/ManagedTimeUnifiedStateImpl.java | 213 +++++++
.../malhar/lib/state/managed/StateTracker.java | 194 ++++++
.../lib/state/managed/TimeBucketAssigner.java | 242 ++++++++
.../malhar/lib/state/managed/package-info.java | 22 +
.../apex/malhar/lib/state/package-info.java | 22 +
.../state/managed/BucketsFileSystemTest.java | 166 ++++++
.../lib/state/managed/DefaultBucketTest.java | 203 +++++++
.../IncrementalCheckpointManagerTest.java | 196 +++++++
.../lib/state/managed/ManagedStateImplTest.java | 182 ++++++
.../state/managed/ManagedStateTestUtils.java | 141 +++++
.../state/managed/ManagedTimeStateImplTest.java | 151 +++++
.../ManagedTimeUnifiedStateImplTest.java | 149 +++++
.../state/managed/MockManagedStateContext.java | 91 +++
.../lib/state/managed/StateTrackerTest.java | 174 ++++++
.../state/managed/TimeBucketAssignerTest.java | 123 ++++
28 files changed, 4654 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
index d4c7810..f8dd0be 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccess.java
@@ -52,8 +52,11 @@ public interface FileAccess extends Closeable
* @throws IOException
*/
void rename(long bucketKey, String oldName, String newName) throws IOException;
+
void delete(long bucketKey, String fileName) throws IOException;
+ void deleteBucket(long bucketKey) throws IOException;
+
long getFileSize(long bucketKey, String s) throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
index 74ab238..a9cfe00 100644
--- a/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
+++ b/library/src/main/java/com/datatorrent/lib/fileaccess/FileAccessFSImpl.java
@@ -135,10 +135,13 @@ public abstract class FileAccessFSImpl implements FileAccess
public RemoteIterator<LocatedFileStatus> listFiles(long bucketKey) throws IOException
{
Path bucketPath = getBucketPath(bucketKey);
- if (!fs.exists(bucketPath)) {
- return null;
- }
- return fs.listFiles(bucketPath, true);
+ return fs.exists(bucketPath) ? fs.listFiles(bucketPath, true) : null;
+ }
+
+ @Override
+ public void deleteBucket(long bucketKey) throws IOException
+ {
+ fs.delete(getBucketPath(bucketKey), true);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
new file mode 100644
index 0000000..a270eb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/BucketedState.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A state where keys are grouped in buckets.
+ */
+public interface BucketedState
+{
+ /**
+ * An expired value. In some implementations where bucketId is time related then the event can be old and
+ * the get methods- getSync & getAsync return this fixed slice instance.<br/>
+ * In the usages, comparisons with EXPIRED should be made using <code>==</code> instead of <code>equals</code>.
+ */
+ Slice EXPIRED = new Slice(null, -1, -1);
+
+ /**
+ * Sets the value of the key in bucket identified by bucketId.
+ *
+ * @param bucketId identifier of the bucket.
+ * @param key key (not null)
+ * @param value value (not null)
+ */
+ void put(long bucketId, @NotNull Slice key, @NotNull Slice value);
+
+ /**
+ * Returns the value of the key in a bucket identified by bucketId. Fetching a key can be expensive if the key
+ * is not in memory and is present on disk. This fetches the key synchronously. <br/>
+ * {@link #getAsync(long, Slice)} is recommended for efficiently reading the value of a key.
+ *
+ * @param bucketId identifier of the bucket.
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found;
+ * {@link #EXPIRED} if the bucketId is time based and very old.
+ */
+ Slice getSync(long bucketId, @NotNull Slice key);
+
+ /**
+ * Returns the future using which the value is obtained.<br/>
+ *
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found;
+ * {@link #EXPIRED} if the bucketId is time based and very old.
+ */
+ Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
new file mode 100644
index 0000000..55b92a3
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/TimeSlicedBucketedState.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A type of bucketed state where a bucket's data is further divided into time buckets. This requires
+ * time per key to figure out which time bucket a particular key belongs to.
+ * <p/>
+ * The time here is mainly used for purging of aged key/value pair.
+ */
+public interface TimeSlicedBucketedState
+{
+ /**
+ * Sets the value of a key in the bucket identified by bucketId. Time is used to derive which time bucket (within
+ * the main bucket) a key belongs to.
+ *
+ * @param bucketId identifier of the bucket.
+ * @param time time associated with the key.
+ * @param key key (not null)
+ * @param value value (not null)
+ */
+ void put(long bucketId, long time, @NotNull Slice key, @NotNull Slice value);
+
+ /**
+ * Returns the value of the key in the bucket identified by bucketId.<br/>
+ * If the value of the key is not present in the bucket cache then this scans all the time bucket files on disk from
+ * the latest to the oldest.
+ *
+ * It retrieves the value synchronously that can be expensive.<br/>
+ * {@link #getAsync(long, Slice)} is recommended for efficient reading the value of a key.
+ *
+ *
+ * @param bucketId identifier of the bucket
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found;
+ */
+ Slice getSync(long bucketId, @NotNull Slice key);
+
+
+ /**
+ * Returns the value of key in the bucket identified by bucketId.<br/>
+ * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+ * bucket and just search for the key in a particular time bucket file.<br/>
+ *
+ * It retrieves the value synchronously which can be expensive.<br/>
+ * {@link #getAsync(long, long, Slice)} is recommended for efficiently reading the value of a key.
+ *
+ * @param bucketId identifier of the bucket.
+ * @param time time for deriving the time bucket.
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if the time is old.
+ */
+ Slice getSync(long bucketId, long time, @NotNull Slice key);
+
+ /**
+ * Returns the future using which the value is obtained.<br/>
+ * If the value of the key is not present in the bucket cache then this searches for it in all the time buckets on
+ * disk.<br/>
+ * Time-buckets are looked-up in order from newest to oldest.
+ *
+ * @param bucketId identifier of the bucket.
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found;
+ */
+ Future<Slice> getAsync(long bucketId, @NotNull Slice key);
+
+ /**
+ * Returns the future using which the value is obtained.<br/>
+ * If the value of the key is not present in the bucket cache then this will use the time to derive the time
+ * bucket and just search for the key in a particular time bucket file.<br/>
+ *
+ * @param bucketId identifier of the bucket.
+ * @param time time associated with the key.
+ * @param key key (not null)
+ *
+ * @return value of the key if found; null if the key is not found; {@link BucketedState#EXPIRED} if time is very old.
+ */
+ Future<Slice> getAsync(long bucketId, long time, @NotNull Slice key);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
new file mode 100644
index 0000000..11db44d
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/AbstractManagedStateImpl.java
@@ -0,0 +1,583 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Comparator;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.esotericsoftware.kryo.serializers.FieldSerializer;
+import com.esotericsoftware.kryo.serializers.JavaSerializer;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ListMultimap;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Multimaps;
+import com.google.common.util.concurrent.Futures;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context.DAGContext;
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.Operator;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.lib.fileaccess.TFileImpl;
+import com.datatorrent.lib.util.comparator.SliceComparator;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * An abstract implementation of managed state.<br/>
+ *
+ * The important sub-components here are:
+ * <ol>
+ * <li>
+ * {@link #checkpointManager}: writes incremental checkpoints in window files and transfers data from window
+ * files to bucket files.
+ * </li>
+ * <li>
+ * {@link #bucketsFileSystem}: manages writing/reading from all the buckets. A bucket on disk is further sub-divided
+ * into time-buckets. This abstracts out updating time-buckets and meta files and reading from them.
+ * </li>
+ * <li>
+ * {@link #timeBucketAssigner}: assigns time-buckets to keys and manages the time boundaries.
+ * </li>
+ * <li>
+ * {@link #stateTracker}: tracks the size of data in memory and requests buckets to free memory when enough memory
+ * is not available.
+ * </li>
+ * <li>
+ * {@link #fileAccess}: plug-able file system abstraction.
+ * </li>
+ * </ol>
+ * <p/>
+ * <b>Differences between different concrete implementations of {@link AbstractManagedStateImpl}</b>
+ * <table>
+ * <tr>
+ * <td></td>
+ * <td>{@link ManagedStateImpl}</td>
+ * <td>{@link ManagedTimeStateImpl}</td>
+ * <td>{@link ManagedTimeUnifiedStateImpl}</td>
+ * </tr>
+ * <tr>
+ * <td>Main buckets</td>
+ * <td>identified by unique adhoc long ids that the user provides with the key.</td>
+ * <td>same as ManagedStateImpl.</td>
+ * <td>user doesn't provide bucket ids and instead just provides time. Time is used to derive the time buckets
+ * and these are the main buckets.</td>
+ * </tr>
+ * <tr>
+ * <td>Data on disk: data in buckets is persisted on disk with each bucket data further divided into
+ * time-buckets, i.e., {base_path}/{bucketId}/{time-bucket id}</td>
+ * <td>time-bucket is computed using the system time corresponding to the application window.</td>
+ * <td>time-bucket is derived from the user provided time.</td>
+ * <td>time-bucket is derived from the user provided time.
+ * In this implementation operator id is used to isolate data of different partitions on disk, i.e.,
+ * {base_path}/{operatorId}/{time-bucket id}</td>
+ * </tr>
+ * <tr>
+ * <td>Bucket partitioning</td>
+ * <td>bucket belongs to just one partition. Multiple partitions cannot write to the same bucket.</td>
+ * <td>same as ManagedStateImpl.</td>
+ * <td>multiple partitions can be working with the same time-bucket since time-bucket is derived from time.
+ * This works because on disk each partition's data is segregated by the operator id.</td>
+ * </tr>
+ * <tr>
+ * <td>Dynamic partitioning</td>
+ * <td>can support dynamic partitioning by pre-allocating buckets.</td>
+ * <td>same as ManagedStateImpl.</td>
+ * <td>will not be able to support dynamic partitioning efficiently.</td>
+ * </tr>
+ * </table>
+ *
+ */
+public abstract class AbstractManagedStateImpl
+ implements ManagedState, Component<OperatorContext>, Operator.CheckpointNotificationListener, ManagedStateContext,
+ TimeBucketAssigner.PurgeListener
+{
+ private long maxMemorySize;
+
+ protected int numBuckets;
+
+ @NotNull
+ private FileAccess fileAccess = new TFileImpl.DTFileImpl();
+ @NotNull
+ protected TimeBucketAssigner timeBucketAssigner = new TimeBucketAssigner();
+
+ protected Bucket[] buckets;
+
+ @Min(1)
+ private int numReaders = 1;
+ @NotNull
+ protected transient ExecutorService readerService;
+
+ @NotNull
+ protected IncrementalCheckpointManager checkpointManager = new IncrementalCheckpointManager();
+
+ @NotNull
+ protected BucketsFileSystem bucketsFileSystem = new BucketsFileSystem();
+
+ protected transient OperatorContext operatorContext;
+
+ @NotNull
+ protected Comparator<Slice> keyComparator = new SliceComparator();
+
+ protected final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+ @NotNull
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration checkStateSizeInterval = Duration.millis(
+ DAGContext.STREAMING_WINDOW_SIZE_MILLIS.defaultValue * OperatorContext.APPLICATION_WINDOW_COUNT.defaultValue);
+
+ @FieldSerializer.Bind(JavaSerializer.class)
+ private Duration durationPreventingFreeingSpace;
+
+ private transient StateTracker stateTracker = new StateTracker();
+
+ //accessible to StateTracker
+ final transient Object commitLock = new Object();
+
+ protected final transient ListMultimap<Long, ValueFetchTask> tasksPerBucketId =
+ Multimaps.synchronizedListMultimap(ArrayListMultimap.<Long, ValueFetchTask>create());
+
+ @Override
+ public void setup(OperatorContext context)
+ {
+ operatorContext = context;
+ fileAccess.init();
+
+ timeBucketAssigner.setPurgeListener(this);
+
+ //setup all the managed state components
+ timeBucketAssigner.setup(this);
+ checkpointManager.setup(this);
+ bucketsFileSystem.setup(this);
+
+ if (buckets == null) {
+ //create buckets array only once at start when it is not created.
+ numBuckets = getNumBuckets();
+ buckets = new Bucket[numBuckets];
+ }
+ for (Bucket bucket : buckets) {
+ if (bucket != null) {
+ bucket.setup(this);
+ }
+ }
+
+ stateTracker.setup(this);
+ long activationWindow = context.getValue(OperatorContext.ACTIVATION_WINDOW_ID);
+
+ if (activationWindow != Stateless.WINDOW_ID) {
+ //delete all the wal files with windows > activationWindow.
+ //All the wal files with windows <= activationWindow are loaded and kept separately as recovered data.
+ try {
+ for (long recoveredWindow : checkpointManager.getWindowIds(operatorContext.getId())) {
+ if (recoveredWindow <= activationWindow) {
+ @SuppressWarnings("unchecked")
+ Map<Long, Map<Slice, Bucket.BucketedValue>> recoveredData = (Map<Long, Map<Slice, Bucket.BucketedValue>>)
+ checkpointManager.load(operatorContext.getId(), recoveredWindow);
+ if (recoveredData != null && !recoveredData.isEmpty()) {
+ for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> entry : recoveredData.entrySet()) {
+ int bucketIdx = prepareBucket(entry.getKey());
+ buckets[bucketIdx].recoveredData(recoveredWindow, entry.getValue());
+ }
+ }
+ checkpointManager.save(recoveredData, operatorContext.getId(), recoveredWindow,
+ true /*skipWritingToWindowFile*/);
+
+ } else {
+ checkpointManager.delete(operatorContext.getId(), recoveredWindow);
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("recovering", e);
+ }
+ }
+
+ readerService = Executors.newFixedThreadPool(numReaders, new NameableThreadFactory("managedStateReaders"));
+ }
+
+ /**
+ * Gets the number of buckets which is required during setup to create the array of buckets.<br/>
+ * {@link ManagedTimeStateImpl} provides num of buckets which is injected using a property.<br/>
+ * {@link ManagedTimeUnifiedStateImpl} provides num of buckets which are calculated based on time settings.
+ *
+ * @return number of buckets.
+ */
+ public abstract int getNumBuckets();
+
+ public void beginWindow(long windowId)
+ {
+ if (throwable.get() != null) {
+ Throwables.propagate(throwable.get());
+ }
+ timeBucketAssigner.beginWindow(windowId);
+ }
+
+
+ /**
+ * Prepares the bucket and returns its index.
+ * @param bucketId bucket key
+ * @return bucket index
+ */
+ protected int prepareBucket(long bucketId)
+ {
+ stateTracker.bucketAccessed(bucketId);
+ int bucketIdx = getBucketIdx(bucketId);
+
+ Bucket bucket = buckets[bucketIdx];
+ if (bucket == null) {
+ //bucket is not in memory
+ bucket = newBucket(bucketId);
+ bucket.setup(this);
+ buckets[bucketIdx] = bucket;
+ } else if (bucket.getBucketId() != bucketId) {
+ handleBucketConflict(bucketIdx, bucketId);
+ }
+ return bucketIdx;
+ }
+
+ protected void putInBucket(long bucketId, long timeBucket, @NotNull Slice key, @NotNull Slice value)
+ {
+ Preconditions.checkNotNull(key, "key");
+ Preconditions.checkNotNull(value, "value");
+ if (timeBucket != -1) {
+ //time bucket is invalid data is not stored
+ int bucketIdx = prepareBucket(bucketId);
+ buckets[bucketIdx].put(key, timeBucket, value);
+ }
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ protected Slice getValueFromBucketSync(long bucketId, long timeBucket, @NotNull Slice key)
+ {
+ Preconditions.checkNotNull(key, "key");
+ int bucketIdx = prepareBucket(bucketId);
+ Bucket bucket = buckets[bucketIdx];
+ synchronized (bucket) {
+ return bucket.get(key, timeBucket, Bucket.ReadSource.ALL);
+ }
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ protected Future<Slice> getValueFromBucketAsync(long bucketId, long timeBucket, @NotNull Slice key)
+ {
+ Preconditions.checkNotNull(key, "key");
+ int bucketIdx = prepareBucket(bucketId);
+ Bucket bucket = buckets[bucketIdx];
+ synchronized (bucket) {
+ Slice cachedVal = bucket.get(key, timeBucket, Bucket.ReadSource.MEMORY);
+ if (cachedVal != null) {
+ return Futures.immediateFuture(cachedVal);
+ }
+ ValueFetchTask valueFetchTask = new ValueFetchTask(bucket, key, timeBucket, this);
+ tasksPerBucketId.put(bucket.getBucketId(), valueFetchTask);
+ return readerService.submit(valueFetchTask);
+ }
+ }
+
+ protected void handleBucketConflict(int bucketIdx, long newBucketId)
+ {
+ throw new IllegalArgumentException("bucket conflict " + buckets[bucketIdx].getBucketId() + " " + newBucketId);
+ }
+
+ protected int getBucketIdx(long bucketId)
+ {
+ return (int)(bucketId % numBuckets);
+ }
+
+ Bucket getBucket(long bucketId)
+ {
+ return buckets[getBucketIdx(bucketId)];
+ }
+
+ protected Bucket newBucket(long bucketId)
+ {
+ return new Bucket.DefaultBucket(bucketId);
+ }
+
+ public void endWindow()
+ {
+ timeBucketAssigner.endWindow();
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override
+ public void beforeCheckpoint(long windowId)
+ {
+ Map<Long, Map<Slice, Bucket.BucketedValue>> flashData = Maps.newHashMap();
+
+ for (Bucket bucket : buckets) {
+ if (bucket != null) {
+ synchronized (bucket) {
+ Map<Slice, Bucket.BucketedValue> flashDataForBucket = bucket.checkpoint(windowId);
+ if (!flashDataForBucket.isEmpty()) {
+ flashData.put(bucket.getBucketId(), flashDataForBucket);
+ }
+ }
+ }
+ }
+ if (!flashData.isEmpty()) {
+ try {
+ checkpointManager.save(flashData, operatorContext.getId(), windowId, false);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Override
+ public void checkpointed(long windowId)
+ {
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override
+ public void committed(long windowId)
+ {
+ synchronized (commitLock) {
+ try {
+ for (Bucket bucket : buckets) {
+ if (bucket != null) {
+ synchronized (bucket) {
+ bucket.committed(windowId);
+ }
+ }
+ }
+ checkpointManager.committed(operatorContext.getId(), windowId);
+ } catch (IOException | InterruptedException e) {
+ throw new RuntimeException("committing " + windowId, e);
+ }
+ }
+ }
+
+ @SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
+ @Override
+ public void teardown()
+ {
+ checkpointManager.teardown();
+ bucketsFileSystem.teardown();
+ timeBucketAssigner.teardown();
+ readerService.shutdownNow();
+ for (Bucket bucket : buckets) {
+ if (bucket != null) {
+ synchronized (bucket) {
+ bucket.teardown();
+ }
+ }
+ }
+ stateTracker.teardown();
+ }
+
+ @Override
+ public void purgeTimeBucketsLessThanEqualTo(long timeBucket)
+ {
+ checkpointManager.setLatestExpiredTimeBucket(timeBucket);
+ }
+
+ @Override
+ public OperatorContext getOperatorContext()
+ {
+ return operatorContext;
+ }
+
+ @Override
+ public void setMaxMemorySize(long bytes)
+ {
+ maxMemorySize = bytes;
+ }
+
+ /**
+ *
+ * @return the optimal size of the cache that triggers eviction of committed data from memory.
+ */
+ public long getMaxMemorySize()
+ {
+ return maxMemorySize;
+ }
+
+ /**
+ * Sets the {@link FileAccess} implementation.
+ * @param fileAccess specific implementation of FileAccess.
+ */
+ public void setFileAccess(@NotNull FileAccess fileAccess)
+ {
+ this.fileAccess = Preconditions.checkNotNull(fileAccess);
+ }
+
+ @Override
+ public FileAccess getFileAccess()
+ {
+ return fileAccess;
+ }
+
+ /**
+ * Sets the time bucket assigner. This can be used for plugging any custom time bucket assigner.
+ *
+ * @param timeBucketAssigner a {@link TimeBucketAssigner}
+ */
+ public void setTimeBucketAssigner(@NotNull TimeBucketAssigner timeBucketAssigner)
+ {
+ this.timeBucketAssigner = Preconditions.checkNotNull(timeBucketAssigner);
+ }
+
+ @Override
+ public TimeBucketAssigner getTimeBucketAssigner()
+ {
+ return timeBucketAssigner;
+ }
+
+ @Override
+ public Comparator<Slice> getKeyComparator()
+ {
+ return keyComparator;
+ }
+
+ /**
+ * Sets the key comparator. The keys on the disk in time bucket files are sorted. This sets the comparator for the
+ * key.
+ * @param keyComparator key comparator
+ */
+ public void setKeyComparator(@NotNull Comparator<Slice> keyComparator)
+ {
+ this.keyComparator = Preconditions.checkNotNull(keyComparator);
+ }
+
+ public BucketsFileSystem getBucketsFileSystem()
+ {
+ return bucketsFileSystem;
+ }
+
+ /**
+ * @return number of worker threads in the reader service.
+ */
+ public int getNumReaders()
+ {
+ return numReaders;
+ }
+
+ /**
+ * Sets the number of worker threads in the reader service which is responsible for asynchronously fetching
+ * values of the keys. This should not exceed number of buckets.
+ *
+ * @param numReaders number of worker threads in the reader service.
+ */
+ public void setNumReaders(int numReaders)
+ {
+ this.numReaders = numReaders;
+ }
+
+ /**
+ * @return regular interval at which the size of state is checked.
+ */
+ public Duration getCheckStateSizeInterval()
+ {
+ return checkStateSizeInterval;
+ }
+
+ /**
+ * Sets the interval at which the size of state is regularly checked.
+
+ * @param checkStateSizeInterval regular interval at which the size of state is checked.
+ */
+ public void setCheckStateSizeInterval(@NotNull Duration checkStateSizeInterval)
+ {
+ this.checkStateSizeInterval = Preconditions.checkNotNull(checkStateSizeInterval);
+ }
+
+ /**
+ * @return duration which prevents a bucket being evicted.
+ */
+ public Duration getDurationPreventingFreeingSpace()
+ {
+ return durationPreventingFreeingSpace;
+ }
+
+ /**
+ * Sets the duration which prevents buckets to free space. For example if this is set to an hour, then only
+ * buckets which were not accessed in last one hour will be triggered to free spaces.
+ *
+ * @param durationPreventingFreeingSpace time duration
+ */
+ public void setDurationPreventingFreeingSpace(Duration durationPreventingFreeingSpace)
+ {
+ this.durationPreventingFreeingSpace = durationPreventingFreeingSpace;
+ }
+
+ static class ValueFetchTask implements Callable<Slice>
+ {
+ private final Bucket bucket;
+ private final long timeBucketId;
+ private final Slice key;
+ private final AbstractManagedStateImpl managedState;
+
+ ValueFetchTask(@NotNull Bucket bucket, @NotNull Slice key, long timeBucketId, AbstractManagedStateImpl managedState)
+ {
+ this.bucket = Preconditions.checkNotNull(bucket);
+ this.timeBucketId = timeBucketId;
+ this.key = Preconditions.checkNotNull(key);
+ this.managedState = Preconditions.checkNotNull(managedState);
+ }
+
+ @Override
+ public Slice call() throws Exception
+ {
+ try {
+ synchronized (bucket) {
+ //a particular bucket should only be handled by one thread at any point of time. Handling of bucket here
+ //involves creating readers for the time buckets and de-serializing key/value from a reader.
+ Slice value = bucket.get(key, timeBucketId, Bucket.ReadSource.ALL);
+ managedState.tasksPerBucketId.remove(bucket.getBucketId(), this);
+ return value;
+ }
+ } catch (Throwable t) {
+ managedState.throwable.set(t);
+ throw Throwables.propagate(t);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ void setStateTracker(@NotNull StateTracker stateTracker)
+ {
+ this.stateTracker = Preconditions.checkNotNull(stateTracker, "state tracker");
+ }
+
+ @VisibleForTesting
+ void setBucketsFileSystem(@NotNull BucketsFileSystem bucketsFileSystem)
+ {
+ this.bucketsFileSystem = Preconditions.checkNotNull(bucketsFileSystem, "buckets file system");
+ }
+
+ private static final transient Logger LOG = LoggerFactory.getLogger(AbstractManagedStateImpl.class);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
new file mode 100644
index 0000000..b2c1618
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java
@@ -0,0 +1,525 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * A bucket that groups events.
+ */
+public interface Bucket extends ManagedStateComponent
+{
+ /**
+ * @return bucket id
+ */
+ long getBucketId();
+
+ /**
+ *
+ * @return size of bucket in memory.
+ */
+ long getSizeInBytes();
+
+ /**
+ * Get value of a key.
+ *
+ * @param key key.
+ * @param timeBucket time bucket of the key if known; -1 otherwise.
+ * @param source source to read from
+ * @return value of the key.
+ */
+ Slice get(Slice key, long timeBucket, ReadSource source);
+
+ /**
+ * Set value of a key.
+ *
+ * @param key key.
+ * @param timeBucket timeBucket of the key.
+ * @param value value of the key.
+ */
+ void put(Slice key, long timeBucket, Slice value);
+
+ /**
+ * Triggers the bucket to checkpoint. Returns the non checkpointed data so far.
+ *
+ * @return non checkpointed data.
+ */
+ Map<Slice, BucketedValue> checkpoint(long windowId);
+
+ /**
+ * Triggers the bucket to commit data till provided window id.
+ *
+ * @param windowId window id
+ */
+ void committed(long windowId);
+
+ /**
+ * Triggers bucket to free memory which is already persisted in bucket data files.
+ *
+ * @return amount of memory freed in bytes.
+ * @throws IOException
+ */
+ long freeMemory() throws IOException;
+
+ /**
+ * Allows the bucket to process/cache data which is recovered (from window files) after failure.
+ *
+ * @param windowId recovery window
+ * @param recoveredData recovered data
+ */
+ void recoveredData(long windowId, Map<Slice, Bucket.BucketedValue> recoveredData);
+
+ enum ReadSource
+ {
+ MEMORY, //state in memory in key/value form
+ READERS, //these are streams in which the key will be searched and serialized.
+ ALL //both the above states.
+ }
+
+ class BucketedValue
+ {
+ private long timeBucket;
+ private Slice value;
+
+ protected BucketedValue()
+ {
+ }
+
+ protected BucketedValue(long timeBucket, Slice value)
+ {
+ this.timeBucket = timeBucket;
+ this.value = value;
+ }
+
+ protected long getTimeBucket()
+ {
+ return timeBucket;
+ }
+
+ protected void setTimeBucket(long timeBucket)
+ {
+ this.timeBucket = timeBucket;
+ }
+
+ public Slice getValue()
+ {
+ return value;
+ }
+
+ public void setValue(Slice value)
+ {
+ this.value = value;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof BucketedValue)) {
+ return false;
+ }
+
+ BucketedValue that = (BucketedValue)o;
+
+ return timeBucket == that.timeBucket && value.equals(that.value);
+
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(timeBucket, value);
+ }
+ }
+
+ /**
+ * Default bucket.<br/>
+ * Not thread-safe.
+ */
+ class DefaultBucket implements Bucket
+ {
+ private final long bucketId;
+
+ //Key -> Ordered values
+ private transient Map<Slice, BucketedValue> flash = Maps.newConcurrentMap();
+
+ //Data persisted in write ahead logs. window -> bucket
+ private final transient ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> checkpointedData =
+ new ConcurrentSkipListMap<>();
+
+ //Data persisted in bucket data files
+ private final transient Map<Slice, BucketedValue> committedData = Maps.newConcurrentMap();
+
+ //Data serialized/deserialized from bucket data files: key -> value from latest time bucket on file
+ private final transient Map<Slice, BucketedValue> fileCache = Maps.newConcurrentMap();
+
+ //TimeBucket -> FileReaders
+ private final transient Map<Long, FileAccess.FileReader> readers = Maps.newTreeMap();
+
+ protected transient ManagedStateContext managedStateContext;
+
+ private AtomicLong sizeInBytes = new AtomicLong(0);
+
+ private final transient Slice dummyGetKey = new Slice(null, 0, 0);
+
+ private transient TreeSet<BucketsFileSystem.TimeBucketMeta> cachedBucketMetas;
+
+ private DefaultBucket()
+ {
+ //for kryo
+ bucketId = -1;
+ }
+
+ protected DefaultBucket(long bucketId)
+ {
+ this.bucketId = bucketId;
+ }
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+ }
+
+ @Override
+ public long getBucketId()
+ {
+ return bucketId;
+ }
+
+ @Override
+ public long getSizeInBytes()
+ {
+ return sizeInBytes.longValue();
+ }
+
+ private Slice getFromMemory(Slice key)
+ {
+ //search the cache for key
+ BucketedValue bucketedValue = flash.get(key);
+ if (bucketedValue != null) {
+ return bucketedValue.getValue();
+ }
+
+ for (Long window : checkpointedData.descendingKeySet()) {
+ //traverse the checkpointed data in reverse order
+ bucketedValue = checkpointedData.get(window).get(key);
+ if (bucketedValue != null) {
+ return bucketedValue.getValue();
+ }
+ }
+
+ bucketedValue = committedData.get(key);
+ if (bucketedValue != null) {
+ return bucketedValue.getValue();
+ }
+
+ bucketedValue = fileCache.get(key);
+ if (bucketedValue != null) {
+ return bucketedValue.getValue();
+ }
+
+ return null;
+ }
+
+ private Slice getFromReaders(Slice key, long timeBucket)
+ {
+ try {
+ if (cachedBucketMetas == null) {
+ cachedBucketMetas = managedStateContext.getBucketsFileSystem().getAllTimeBuckets(bucketId);
+ }
+ if (timeBucket != -1) {
+ Slice valSlice = getValueFromTimeBucketReader(key, timeBucket);
+ if (valSlice != null) {
+ if (timeBucket == cachedBucketMetas.first().getTimeBucketId()) {
+ //if the requested time bucket is the latest time bucket on file, the key/value is put in the file cache.
+ BucketedValue bucketedValue = new BucketedValue(timeBucket, valSlice);
+ fileCache.put(key, bucketedValue);
+ }
+ }
+ return valSlice;
+ } else {
+ //search all the time buckets
+ for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) {
+
+ if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) {
+ //keys in the time bucket files are sorted so if the first key in the file is greater than the key being
+ //searched, the key will not be present in that file.
+ Slice valSlice = getValueFromTimeBucketReader(key, immutableTimeBucketMeta.getTimeBucketId());
+ if (valSlice != null) {
+ BucketedValue bucketedValue = new BucketedValue(immutableTimeBucketMeta.getTimeBucketId(), valSlice);
+ //Only when the key is read from the latest time bucket on the file, the key/value is put in the file
+ // cache.
+ fileCache.put(key, bucketedValue);
+ return valSlice;
+ }
+ }
+ }
+ return null;
+ }
+
+ } catch (IOException e) {
+ throw new RuntimeException("get time-buckets " + bucketId, e);
+ }
+ }
+
+ @Override
+ public Slice get(Slice key, long timeBucket, ReadSource readSource)
+ {
+ switch (readSource) {
+ case MEMORY:
+ return getFromMemory(key);
+ case READERS:
+ return getFromReaders(key, timeBucket);
+ case ALL:
+ default:
+ Slice value = getFromMemory(key);
+ if (value != null) {
+ return value;
+ }
+ return getFromReaders(key, timeBucket);
+ }
+ }
+
+ /**
+ * Returns the value for the key from a time-bucket reader
+ * @param key key
+ * @param timeBucket time bucket
+ * @return value if key is found in the time bucket; false otherwise
+ */
+ private Slice getValueFromTimeBucketReader(Slice key, long timeBucket)
+ {
+ FileAccess.FileReader fileReader = readers.get(timeBucket);
+ if (fileReader != null) {
+ return readValue(fileReader, key, timeBucket);
+ }
+ //file reader is not loaded and is null
+ try {
+ if (loadFileReader(timeBucket)) {
+ return readValue(readers.get(timeBucket), key, timeBucket);
+ }
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException("while loading " + bucketId + ", " + timeBucket, e);
+ }
+ }
+
+ private Slice readValue(FileAccess.FileReader fileReader, Slice key, long timeBucket)
+ {
+ Slice valSlice = new Slice(null, 0, 0);
+ try {
+ if (fileReader.seek(key)) {
+ fileReader.next(dummyGetKey, valSlice);
+ return valSlice;
+ } else {
+ return null;
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("reading " + bucketId + ", " + timeBucket, e);
+ }
+ }
+
+ private boolean loadFileReader(long timeBucketId) throws IOException
+ {
+ BucketsFileSystem.TimeBucketMeta tbm = managedStateContext.getBucketsFileSystem()
+ .getTimeBucketMeta(bucketId, timeBucketId);
+
+ if (tbm != null) {
+ FileAccess.FileReader reader = managedStateContext.getBucketsFileSystem().getReader(bucketId,
+ BucketsFileSystem.getFileName(timeBucketId));
+ readers.put(timeBucketId, reader);
+ sizeInBytes.getAndAdd(tbm.getSizeInBytes());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void put(Slice key, long timeBucket, Slice value)
+ {
+ BucketedValue bucketedValue = flash.get(key);
+ if (bucketedValue == null) {
+ bucketedValue = new BucketedValue();
+ flash.put(key, bucketedValue);
+ sizeInBytes.getAndAdd(key.length);
+ sizeInBytes.getAndAdd(Long.SIZE);
+ }
+ if (timeBucket > bucketedValue.getTimeBucket()) {
+
+ int inc = null == bucketedValue.getValue() ? value.length : value.length - bucketedValue.getValue().length;
+ sizeInBytes.getAndAdd(inc);
+ bucketedValue.setTimeBucket(timeBucket);
+ bucketedValue.setValue(value);
+ }
+ }
+
+ @Override
+ public long freeMemory() throws IOException
+ {
+ LOG.debug("free space {}", bucketId);
+ long memoryFreed = 0;
+ for (Map.Entry<Slice, BucketedValue> entry : committedData.entrySet()) {
+ memoryFreed += entry.getKey().length + entry.getValue().getValue().length;
+ }
+ committedData.clear();
+ fileCache.clear();
+ if (cachedBucketMetas != null) {
+
+ for (BucketsFileSystem.TimeBucketMeta tbm : cachedBucketMetas) {
+ FileAccess.FileReader reader = readers.remove(tbm.getTimeBucketId());
+ if (reader != null) {
+ memoryFreed += tbm.getSizeInBytes();
+ reader.close();
+ }
+ }
+
+ }
+ sizeInBytes.getAndAdd(-memoryFreed);
+ return memoryFreed;
+ }
+
+ @Override
+ public Map<Slice, BucketedValue> checkpoint(long windowId)
+ {
+ try {
+ //transferring the data from flash to check-pointed state in finally block and re-initializing the flash.
+ return flash;
+ } finally {
+ checkpointedData.put(windowId, flash);
+ flash = Maps.newHashMap();
+ }
+ }
+
+ @Override
+ public void committed(long committedWindowId)
+ {
+ Iterator<Map.Entry<Long, Map<Slice, BucketedValue>>> stateIterator = checkpointedData.entrySet().iterator();
+
+ while (stateIterator.hasNext()) {
+ Map.Entry<Long, Map<Slice, BucketedValue>> entry = stateIterator.next();
+
+ long savedWindow = entry.getKey();
+ if (savedWindow <= committedWindowId) {
+ Map<Slice, BucketedValue> bucketData = entry.getValue();
+
+ //removing any stale values from the file cache
+ for (Slice key : bucketData.keySet()) {
+ fileCache.remove(key);
+ }
+
+ for (BucketedValue bucketedValue : bucketData.values()) {
+ FileAccess.FileReader reader = readers.get(bucketedValue.getTimeBucket());
+ if (reader != null) {
+ //closing the file reader for the time bucket if it is in memory because the time-bucket is modified
+ //so it will be re-written by BucketsDataManager
+ try {
+ LOG.debug("closing reader {} {}", bucketId, bucketedValue.getTimeBucket());
+ reader.close();
+ } catch (IOException e) {
+ throw new RuntimeException("closing reader " + bucketId + ", " + bucketedValue.getTimeBucket(), e);
+ }
+ readers.remove(bucketedValue.getTimeBucket());
+ }
+ if (readers.isEmpty()) {
+ break;
+ }
+ }
+ committedData.putAll(bucketData);
+ stateIterator.remove();
+ } else {
+ break;
+ }
+ }
+
+ cachedBucketMetas = null;
+ }
+
+ @Override
+ public void recoveredData(long recoveredWindow, Map<Slice, BucketedValue> data)
+ {
+ checkpointedData.put(recoveredWindow, data);
+ }
+
+ @Override
+ public void teardown()
+ {
+ Set<Long> failureBuckets = Sets.newHashSet();
+ for (Map.Entry<Long, FileAccess.FileReader> entry : readers.entrySet()) {
+ try {
+ LOG.debug("closing reader {} {}", bucketId, entry.getKey());
+ entry.getValue().close();
+ } catch (IOException e) {
+ //will try to close all readers
+ failureBuckets.add(entry.getKey());
+ }
+ }
+ if (!failureBuckets.isEmpty()) {
+ StringBuilder builder = new StringBuilder("teardown of ");
+ builder.append(bucketId).append(" < ");
+ for (Long timeBucket : failureBuckets) {
+ builder.append(timeBucket);
+ }
+ builder.append(">");
+ throw new RuntimeException(builder.toString());
+ }
+ }
+
+ @VisibleForTesting
+ Map<Long, FileAccess.FileReader> getReaders()
+ {
+ return readers;
+ }
+
+ @VisibleForTesting
+ Map<Slice, BucketedValue> getCommittedData()
+ {
+ return committedData;
+ }
+
+ @VisibleForTesting
+ ConcurrentSkipListMap<Long, Map<Slice, BucketedValue>> getCheckpointedData()
+ {
+ return checkpointedData;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultBucket.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
new file mode 100644
index 0000000..8304fb6
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java
@@ -0,0 +1,566 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentSkipListSet;
+
+import javax.annotation.Nullable;
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Ordering;
+import com.google.common.collect.Sets;
+import com.google.common.collect.Table;
+import com.google.common.collect.TreeBasedTable;
+
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Persists bucket data on disk and maintains meta information about the buckets.
+ * <p/>
+ *
+ * Each bucket has a meta-data file and the format of that is :<br/>
+ * <ol>
+ * <li>version of the meta data (int)</li>
+ * <li>total number of time-buckets (int)</li>
+ * <li>For each time bucket
+ * <ol>
+ * <li>time bucket key (long)</li>
+ * <li>size of data (sum of bytes) (long)</li>
+ * <li>last transferred window id (long)</li>
+ * <li>length of the first key in the time-bucket file (int)</li>
+ * <li>first key in the time-bucket file (byte[])</li>
+ * </ol>
+ * </li>
+ * </ol>
+ * <p/>
+ * Meta data information is updated by {@link IncrementalCheckpointManager}. Any updates are restricted to the package.
+ */
+public class BucketsFileSystem implements ManagedStateComponent
+{
+ static final String META_FILE_NAME = "_META";
+ private static final int META_FILE_VERSION = 1;
+
+ private final transient TreeBasedTable<Long, Long, MutableTimeBucketMeta> timeBucketsMeta = TreeBasedTable.create();
+
+ //Check-pointed set of all buckets this instance has written to.
+ protected final Set<Long> bucketNamesOnFS = new ConcurrentSkipListSet<>();
+
+ protected transient ManagedStateContext managedStateContext;
+
+ @Override
+ public void setup(@NotNull ManagedStateContext managedStateContext)
+ {
+ this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+ }
+
+ protected FileAccess.FileWriter getWriter(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getWriter(bucketId, fileName);
+ }
+
+ protected FileAccess.FileReader getReader(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getReader(bucketId, fileName);
+ }
+
+ protected void rename(long bucketId, String fromName, String toName) throws IOException
+ {
+ managedStateContext.getFileAccess().rename(bucketId, fromName, toName);
+ }
+
+ protected DataOutputStream getOutputStream(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getOutputStream(bucketId, fileName);
+ }
+
+ protected DataInputStream getInputStream(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().getInputStream(bucketId, fileName);
+ }
+
+ protected boolean exists(long bucketId, String fileName) throws IOException
+ {
+ return managedStateContext.getFileAccess().exists(bucketId, fileName);
+ }
+
+ protected RemoteIterator<LocatedFileStatus> listFiles(long bucketId) throws IOException
+ {
+ return managedStateContext.getFileAccess().listFiles(bucketId);
+ }
+
+ protected void delete(long bucketId, String fileName) throws IOException
+ {
+ managedStateContext.getFileAccess().delete(bucketId, fileName);
+ }
+
+ protected void deleteBucket(long bucketId) throws IOException
+ {
+ managedStateContext.getFileAccess().deleteBucket(bucketId);
+ }
+
+ /**
+ * Saves data to a bucket. The data consists of key/values of all time-buckets of a particular bucket.
+ *
+ * @param windowId window id
+ * @param bucketId bucket id
+ * @param data data of all time-buckets
+ * @throws IOException
+ */
+ protected void writeBucketData(long windowId, long bucketId, Map<Slice, Bucket.BucketedValue> data) throws IOException
+ {
+ Table<Long, Slice, Bucket.BucketedValue> timeBucketedKeys = TreeBasedTable.create(Ordering.<Long>natural(),
+ managedStateContext.getKeyComparator());
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry : data.entrySet()) {
+ long timeBucketId = entry.getValue().getTimeBucket();
+ timeBucketedKeys.put(timeBucketId, entry.getKey(), entry.getValue());
+ }
+
+ for (long timeBucket : timeBucketedKeys.rowKeySet()) {
+ BucketsFileSystem.MutableTimeBucketMeta tbm = getOrCreateTimeBucketMeta(bucketId, timeBucket);
+ addBucketName(bucketId);
+
+ long dataSize = 0;
+ Slice firstKey = null;
+
+ FileAccess.FileWriter fileWriter;
+ String tmpFileName = getTmpFileName();
+ if (tbm.getLastTransferredWindowId() == -1) {
+ //A new time bucket so we append all the key/values to the new file
+ fileWriter = getWriter(bucketId, tmpFileName);
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+ Slice key = entry.getKey();
+ Slice value = entry.getValue().getValue();
+
+ dataSize += key.length;
+ dataSize += value.length;
+
+ fileWriter.append(key.toByteArray(), value.toByteArray());
+ if (firstKey == null) {
+ firstKey = key;
+ }
+ }
+ } else {
+ //the time bucket existed so we need to read the file and then re-write it
+ TreeMap<Slice, Slice> fileData = new TreeMap<>(managedStateContext.getKeyComparator());
+ FileAccess.FileReader fileReader = getReader(bucketId, getFileName(timeBucket));
+ fileReader.readFully(fileData);
+ fileReader.close();
+
+ for (Map.Entry<Slice, Bucket.BucketedValue> entry : timeBucketedKeys.row(timeBucket).entrySet()) {
+ fileData.put(entry.getKey(), entry.getValue().getValue());
+ }
+
+ fileWriter = getWriter(bucketId, tmpFileName);
+ for (Map.Entry<Slice, Slice> entry : fileData.entrySet()) {
+ Slice key = entry.getKey();
+ Slice value = entry.getValue();
+
+ dataSize += key.length;
+ dataSize += value.length;
+
+ fileWriter.append(key.toByteArray(), value.toByteArray());
+ if (firstKey == null) {
+ firstKey = key;
+ }
+ }
+ }
+ fileWriter.close();
+ rename(bucketId, tmpFileName, getFileName(timeBucket));
+ tbm.updateTimeBucketMeta(windowId, dataSize, firstKey);
+ }
+
+ updateBucketMetaFile(bucketId);
+ }
+
+ /**
+ * Retrieves the time bucket meta of a particular time-bucket. If the time bucket doesn't exist then a new one
+ * is created.
+ *
+ * @param bucketId bucket id
+ * @param timeBucketId time bucket id
+ * @return time bucket meta of the time bucket
+ * @throws IOException
+ */
+ @NotNull
+ MutableTimeBucketMeta getOrCreateTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+ if (tbm == null) {
+ tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+ timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+ }
+ return tbm;
+ }
+ }
+
+ protected void addBucketName(long bucketId)
+ {
+ bucketNamesOnFS.add(bucketId);
+ }
+
+ /**
+ * Returns the time bucket meta of a particular time-bucket which is immutable.
+ *
+ * @param bucketId bucket id
+ * @param timeBucketId time bucket id
+ * @return immutable time bucket meta
+ * @throws IOException
+ */
+ @Nullable
+ public TimeBucketMeta getTimeBucketMeta(long bucketId, long timeBucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ MutableTimeBucketMeta tbm = timeBucketMetaHelper(bucketId, timeBucketId);
+ if (tbm != null) {
+ return tbm.getImmutableTimeBucketMeta();
+ }
+ return null;
+ }
+ }
+
+ /**
+ * This should be entered only after acquiring the lock on {@link #timeBucketsMeta}
+ *
+ * @param bucketId bucket id
+ * @param timeBucketId time bucket id
+ * @return Mutable time bucket meta for a bucket id and time bucket id.
+ * @throws IOException
+ */
+ private MutableTimeBucketMeta timeBucketMetaHelper(long bucketId, long timeBucketId) throws IOException
+ {
+ MutableTimeBucketMeta tbm = timeBucketsMeta.get(bucketId, timeBucketId);
+ if (tbm != null) {
+ return tbm;
+ }
+ if (exists(bucketId, META_FILE_NAME)) {
+ try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+ //Load meta info of all the time buckets of the bucket identified by bucketId.
+ loadBucketMetaFile(bucketId, dis);
+ }
+ } else {
+ return null;
+ }
+ return timeBucketsMeta.get(bucketId, timeBucketId);
+ }
+
+ /**
+ * Returns the meta information of all the time buckets in the bucket in descending order - latest to oldest.
+ *
+ * @param bucketId bucket id
+ * @return all the time buckets in order - latest to oldest
+ */
+ public TreeSet<TimeBucketMeta> getAllTimeBuckets(long bucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ TreeSet<TimeBucketMeta> immutableTimeBucketMetas = Sets.newTreeSet(
+ Collections.<TimeBucketMeta>reverseOrder());
+
+ if (timeBucketsMeta.containsRow(bucketId)) {
+ for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+ immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+ }
+ return immutableTimeBucketMetas;
+ }
+ if (exists(bucketId, META_FILE_NAME)) {
+ try (DataInputStream dis = getInputStream(bucketId, META_FILE_NAME)) {
+ //Load meta info of all the time buckets of the bucket identified by bucket id
+ loadBucketMetaFile(bucketId, dis);
+ for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBucketsMeta.row(bucketId).entrySet()) {
+ immutableTimeBucketMetas.add(entry.getValue().getImmutableTimeBucketMeta());
+ }
+ return immutableTimeBucketMetas;
+ }
+ }
+ return immutableTimeBucketMetas;
+ }
+ }
+
+ /**
+ * Loads the bucket meta-file. This should be entered only after acquiring the lock on {@link #timeBucketsMeta}.
+ *
+ * @param bucketId bucket id
+ * @param dis data input stream
+ * @throws IOException
+ */
+ private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException
+ {
+ int metaDataVersion = dis.readInt();
+
+ if (metaDataVersion == META_FILE_VERSION) {
+ int numberOfEntries = dis.readInt();
+
+ for (int i = 0; i < numberOfEntries; i++) {
+ long timeBucketId = dis.readLong();
+ long dataSize = dis.readLong();
+ long lastTransferredWindow = dis.readLong();
+
+ MutableTimeBucketMeta tbm = new MutableTimeBucketMeta(bucketId, timeBucketId);
+
+ int sizeOfFirstKey = dis.readInt();
+ byte[] firstKeyBytes = new byte[sizeOfFirstKey];
+ dis.readFully(firstKeyBytes, 0, firstKeyBytes.length);
+ tbm.updateTimeBucketMeta(lastTransferredWindow, dataSize, new Slice(firstKeyBytes));
+
+ timeBucketsMeta.put(bucketId, timeBucketId, tbm);
+ }
+ }
+ }
+
+ /**
+ * Saves the updated bucket meta on disk.
+ *
+ * @param bucketId bucket id
+ * @throws IOException
+ */
+ void updateBucketMetaFile(long bucketId) throws IOException
+ {
+ Map<Long, MutableTimeBucketMeta> timeBuckets;
+ synchronized (timeBucketsMeta) {
+ timeBuckets = timeBucketsMeta.row(bucketId);
+
+ Preconditions.checkNotNull(timeBuckets, "timeBuckets");
+ String tmpFileName = getTmpFileName();
+
+ try (DataOutputStream dos = getOutputStream(bucketId, tmpFileName)) {
+ dos.writeInt(META_FILE_VERSION);
+ dos.writeInt(timeBuckets.size());
+ for (Map.Entry<Long, MutableTimeBucketMeta> entry : timeBuckets.entrySet()) {
+ MutableTimeBucketMeta tbm = entry.getValue();
+ dos.writeLong(tbm.getTimeBucketId());
+ dos.writeLong(tbm.getSizeInBytes());
+ dos.writeLong(tbm.getLastTransferredWindowId());
+ dos.writeInt(tbm.getFirstKey().length);
+ dos.write(tbm.getFirstKey().toByteArray());
+ }
+
+ }
+ rename(bucketId, tmpFileName, META_FILE_NAME);
+ }
+ }
+
+ protected void deleteTimeBucketsLessThanEqualTo(long latestExpiredTimeBucket) throws IOException
+ {
+ LOG.debug("delete files before {}", latestExpiredTimeBucket);
+
+ for (long bucketName : bucketNamesOnFS) {
+ RemoteIterator<LocatedFileStatus> timeBucketsIterator = listFiles(bucketName);
+ boolean emptyBucket = true;
+ while (timeBucketsIterator.hasNext()) {
+ LocatedFileStatus timeBucketStatus = timeBucketsIterator.next();
+
+ String timeBucketStr = timeBucketStatus.getPath().getName();
+ if (timeBucketStr.equals(BucketsFileSystem.META_FILE_NAME) || timeBucketStr.endsWith(".tmp")) {
+ //ignoring meta and tmp files
+ continue;
+ }
+ long timeBucket = Long.parseLong(timeBucketStr);
+
+ if (timeBucket <= latestExpiredTimeBucket) {
+ LOG.debug("deleting bucket {} time-bucket {}", timeBucket);
+ invalidateTimeBucket(bucketName, timeBucket);
+ delete(bucketName, timeBucketStatus.getPath().getName());
+ } else {
+ emptyBucket = false;
+ }
+ }
+ if (emptyBucket) {
+ LOG.debug("deleting bucket {}", bucketName);
+ deleteBucket(bucketName);
+ }
+ }
+ }
+
+ void invalidateTimeBucket(long bucketId, long timeBucketId) throws IOException
+ {
+ synchronized (timeBucketsMeta) {
+ timeBucketsMeta.remove(bucketId, timeBucketId);
+ }
+ updateBucketMetaFile(bucketId);
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ /**
+ * This serves the readers - {@link Bucket.DefaultBucket}.
+ * It is immutable and accessible outside the package unlike {@link MutableTimeBucketMeta}.
+ */
+ public static class TimeBucketMeta implements Comparable<TimeBucketMeta>
+ {
+ private final long bucketId;
+ private final long timeBucketId;
+ private long lastTransferredWindowId = -1;
+ private long sizeInBytes;
+ private Slice firstKey;
+
+ private TimeBucketMeta()
+ {
+ //for kryo
+ bucketId = -1;
+ timeBucketId = -1;
+ }
+
+ private TimeBucketMeta(long bucketId, long timeBucketId)
+ {
+ this.bucketId = bucketId;
+ this.timeBucketId = timeBucketId;
+ }
+
+ public long getLastTransferredWindowId()
+ {
+ return lastTransferredWindowId;
+ }
+
+ public long getSizeInBytes()
+ {
+ return this.sizeInBytes;
+ }
+
+ public long getBucketId()
+ {
+ return bucketId;
+ }
+
+ public long getTimeBucketId()
+ {
+ return timeBucketId;
+ }
+
+ public Slice getFirstKey()
+ {
+ return firstKey;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof TimeBucketMeta)) {
+ return false;
+ }
+
+ TimeBucketMeta that = (TimeBucketMeta)o;
+
+ return bucketId == that.bucketId && timeBucketId == that.timeBucketId;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(bucketId, timeBucketId);
+ }
+
+ @Override
+ public int compareTo(@NotNull TimeBucketMeta o)
+ {
+ if (bucketId < o.bucketId) {
+ return -1;
+ }
+ if (bucketId > o.bucketId) {
+ return 1;
+ }
+ if (timeBucketId < o.timeBucketId) {
+ return -1;
+ }
+ if (timeBucketId > o.timeBucketId) {
+ return 1;
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Represents time bucket meta information which can be changed.
+ * The updates to an instance and read/creation of {@link #immutableTimeBucketMeta} belonging to it are synchronized
+ * as different threads are updating and reading from it.<br/>
+ *
+ * The instance is updated when data from window files are transferred to bucket files and
+ * {@link Bucket.DefaultBucket} reads the immutable time bucket meta.
+ */
+ static class MutableTimeBucketMeta extends TimeBucketMeta
+ {
+ private transient TimeBucketMeta immutableTimeBucketMeta;
+
+ private volatile boolean changed;
+
+ public MutableTimeBucketMeta(long bucketId, long timeBucketId)
+ {
+ super(bucketId, timeBucketId);
+ }
+
+ synchronized void updateTimeBucketMeta(long lastTransferredWindow, long bytes, @NotNull Slice firstKey)
+ {
+ changed = true;
+ super.lastTransferredWindowId = lastTransferredWindow;
+ super.sizeInBytes = bytes;
+ super.firstKey = Preconditions.checkNotNull(firstKey, "first key");
+ }
+
+ synchronized TimeBucketMeta getImmutableTimeBucketMeta()
+ {
+ if (immutableTimeBucketMeta == null || changed) {
+
+ immutableTimeBucketMeta = new TimeBucketMeta(getBucketId(), getTimeBucketId());
+ immutableTimeBucketMeta.lastTransferredWindowId = getLastTransferredWindowId();
+ immutableTimeBucketMeta.sizeInBytes = getSizeInBytes();
+ immutableTimeBucketMeta.firstKey = getFirstKey();
+ changed = false;
+ }
+ return immutableTimeBucketMeta;
+ }
+
+ }
+
+ protected static String getFileName(long timeBucketId)
+ {
+ return Long.toString(timeBucketId);
+ }
+
+ protected static String getTmpFileName()
+ {
+ return System.currentTimeMillis() + ".tmp";
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(BucketsFileSystem.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
new file mode 100644
index 0000000..a372163
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+
+import javax.validation.constraints.NotNull;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Throwables;
+import com.google.common.collect.Queues;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.common.util.NameableThreadFactory;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Manages state which is written to files by windows. The state from the window files are then transferred to bucket
+ * data files. This class listens to time expiry events issued by {@link TimeBucketAssigner}.
+ *
+ * This component is also responsible for purging old time buckets.
+ */
+public class IncrementalCheckpointManager extends WindowDataManager.FSWindowDataManager
+ implements ManagedStateComponent
+{
+ private static final String WAL_RELATIVE_PATH = "managed_state";
+
+ //windowId => (bucketId => data)
+ private final transient Map<Long, Map<Long, Map<Slice, Bucket.BucketedValue>>> savedWindows = new
+ ConcurrentSkipListMap<>();
+
+ private transient ExecutorService writerService;
+ private transient volatile boolean transfer;
+
+ private final transient LinkedBlockingQueue<Long> windowsToTransfer = Queues.newLinkedBlockingQueue();
+ private final transient AtomicReference<Throwable> throwable = new AtomicReference<>();
+
+ protected transient ManagedStateContext managedStateContext;
+
+ private final transient AtomicLong latestExpiredTimeBucket = new AtomicLong(-1);
+
+ private transient int waitMillis;
+
+
+ public IncrementalCheckpointManager()
+ {
+ super();
+ setRecoveryPath(WAL_RELATIVE_PATH);
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ throw new UnsupportedOperationException("not supported");
+ }
+
+ @Override
+ public void setup(@NotNull final ManagedStateContext managedStateContext)
+ {
+ this.managedStateContext = Preconditions.checkNotNull(managedStateContext, "managed state context");
+ waitMillis = managedStateContext.getOperatorContext().getValue(Context.OperatorContext.SPIN_MILLIS);
+ super.setup(managedStateContext.getOperatorContext());
+
+ writerService = Executors.newSingleThreadExecutor(new NameableThreadFactory("managed-state-writer"));
+ transfer = true;
+ writerService.submit(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ while (transfer) {
+ transferWindowFiles();
+ if (latestExpiredTimeBucket.get() > -1) {
+ try {
+ managedStateContext.getBucketsFileSystem().deleteTimeBucketsLessThanEqualTo(
+ latestExpiredTimeBucket.getAndSet(-1));
+ } catch (IOException e) {
+ throwable.set(e);
+ LOG.debug("delete files", e);
+ Throwables.propagate(e);
+ }
+ }
+ }
+ }
+ });
+ }
+
+ protected void transferWindowFiles()
+ {
+ try {
+ Long windowId = windowsToTransfer.poll();
+ if (windowId != null) {
+ try {
+ LOG.debug("transfer window {}", windowId);
+ //bucket id => bucket data(key => value, time-buckets)
+ Map<Long, Map<Slice, Bucket.BucketedValue>> buckets = savedWindows.remove(windowId);
+
+ for (Map.Entry<Long, Map<Slice, Bucket.BucketedValue>> singleBucket : buckets.entrySet()) {
+ managedStateContext.getBucketsFileSystem().writeBucketData(windowId, singleBucket.getKey(),
+ singleBucket.getValue());
+ }
+ storageAgent.delete(managedStateContext.getOperatorContext().getId(), windowId);
+ } catch (Throwable t) {
+ throwable.set(t);
+ LOG.debug("transfer window {}", windowId, t);
+ Throwables.propagate(t);
+ }
+ } else {
+ Thread.sleep(waitMillis);
+ }
+ } catch (InterruptedException ex) {
+ //sleep can be interrupted by teardown so no need to re-throw interrupt exception
+ LOG.debug("interrupted", ex);
+ }
+ }
+
+ @Override
+ public void save(Object object, int operatorId, long windowId) throws IOException
+ {
+ throw new UnsupportedOperationException("doesn't support saving any object");
+ }
+
+ /**
+ * The unsaved state combines data received in multiple windows. This window data manager persists this data
+ * on disk by the window id in which it was requested.
+ * @param unsavedData un-saved data of all buckets.
+ * @param operatorId operator id.
+ * @param windowId window id.
+ * @param skipWriteToWindowFile flag that enables/disables saving the window file.
+ *
+ * @throws IOException
+ */
+ public void save(Map<Long, Map<Slice, Bucket.BucketedValue>> unsavedData, int operatorId, long windowId,
+ boolean skipWriteToWindowFile)
+ throws IOException
+ {
+ Throwable lthrowable;
+ if ((lthrowable = throwable.get()) != null) {
+ LOG.error("Error while transferring");
+ Throwables.propagate(lthrowable);
+ }
+ savedWindows.put(windowId, unsavedData);
+
+ if (!skipWriteToWindowFile) {
+ super.save(unsavedData, operatorId, windowId);
+ }
+ }
+
+
+
+ /**
+ * Transfers the data which has been committed till windowId to data files.
+ *
+ * @param operatorId operator id
+ * @param windowId window id
+ */
+ @SuppressWarnings("UnusedParameters")
+ protected void committed(int operatorId, long windowId) throws IOException, InterruptedException
+ {
+ LOG.debug("data manager committed {}", windowId);
+ for (Long currentWindow : savedWindows.keySet()) {
+ if (currentWindow <= windowId) {
+ LOG.debug("to transfer {}", windowId);
+ windowsToTransfer.add(currentWindow);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ super.teardown();
+ transfer = false;
+ writerService.shutdownNow();
+ }
+
+ public void setLatestExpiredTimeBucket(long timeBucket)
+ {
+ latestExpiredTimeBucket.set(timeBucket);
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(IncrementalCheckpointManager.class);
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
new file mode 100644
index 0000000..12928f1
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedState.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.apex.malhar.lib.state.managed;
+
+/**
+ * Managed state has a limit on amount of data in memory.
+ */
+public interface ManagedState
+{
+ /**
+ * Sets the maximum memory size.
+ * @param bytes max size in bytes.
+ */
+ void setMaxMemorySize(long bytes);
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
new file mode 100644
index 0000000..1044e15
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateComponent.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import javax.validation.constraints.NotNull;
+
+public interface ManagedStateComponent
+{
+ /**
+ * Callback to setup using managed state context
+ *
+ * @param managedStateContext managed state context
+ */
+ void setup(@NotNull ManagedStateContext managedStateContext);
+
+ /**
+ * Callback to perform teardown.
+ */
+ void teardown();
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
new file mode 100644
index 0000000..406fdbd
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateContext.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.Comparator;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.fileaccess.FileAccess;
+import com.datatorrent.netlet.util.Slice;
+
+public interface ManagedStateContext
+{
+ FileAccess getFileAccess();
+
+ Context.OperatorContext getOperatorContext();
+
+ Comparator<Slice> getKeyComparator();
+
+ BucketsFileSystem getBucketsFileSystem();
+
+ TimeBucketAssigner getTimeBucketAssigner();
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/a8fbcac6/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
new file mode 100644
index 0000000..4c3cf84
--- /dev/null
+++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImpl.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.state.managed;
+
+import java.util.concurrent.Future;
+
+import javax.validation.constraints.Min;
+import javax.validation.constraints.NotNull;
+
+import org.apache.apex.malhar.lib.state.BucketedState;
+
+import com.datatorrent.api.Context;
+import com.datatorrent.netlet.util.Slice;
+
+/**
+ * Basic implementation of {@link AbstractManagedStateImpl} where system time corresponding to an application window is
+ * used to sub-group key of a particular bucket.<br/>
+ */
+public class ManagedStateImpl extends AbstractManagedStateImpl implements BucketedState
+{
+ private long time = System.currentTimeMillis();
+ private transient long timeIncrement;
+
+ public ManagedStateImpl()
+ {
+ this.numBuckets = 1;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ super.setup(context);
+ timeIncrement = context.getValue(Context.OperatorContext.APPLICATION_WINDOW_COUNT) *
+ context.getValue(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS);
+ }
+
+ @Override
+ public void put(long bucketId, @NotNull Slice key, @NotNull Slice value)
+ {
+ long timeBucket = timeBucketAssigner.getTimeBucketFor(time);
+ putInBucket(bucketId, timeBucket, key, value);
+ }
+
+ @Override
+ public Slice getSync(long bucketId, @NotNull Slice key)
+ {
+ return getValueFromBucketSync(bucketId, -1, key);
+ }
+
+ /**
+ * Returns the future using which the value is obtained.<br/>
+ * If the key is present in the bucket cache, then the future has its value set when constructed;
+ * if not the value is set after it's read from the data files which is after a while.
+ *
+ * @param key key
+ * @return value of the key if found; null if the key is not found;
+ */
+ @Override
+ public Future<Slice> getAsync(long bucketId, @NotNull Slice key)
+ {
+ return getValueFromBucketAsync(bucketId, -1, key);
+ }
+
+ @Override
+ public void endWindow()
+ {
+ super.endWindow();
+ time += timeIncrement;
+ }
+
+ @Min(1)
+ @Override
+ public int getNumBuckets()
+ {
+ return numBuckets;
+ }
+
+ /**
+ * Sets the number of buckets.
+ *
+ * @param numBuckets number of buckets
+ */
+ public void setNumBuckets(int numBuckets)
+ {
+ this.numBuckets = numBuckets;
+ }
+}