You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by br...@apache.org on 2015/11/19 00:05:52 UTC
[1/2] incubator-apex-malhar git commit: MLHR-1902 #comment renamed
IdempotentStorageManager to WindowDataManager
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/devel-3 2f97e7a40 -> a4b999de1
MLHR-1902 #comment renamed IdempotentStorageManager to WindowDataManager
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dae30a03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dae30a03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dae30a03
Branch: refs/heads/devel-3
Commit: dae30a03e53bebc404a3852c2c8d23c7619c5fd1
Parents: 2f97e7a
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Nov 18 13:26:21 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Nov 18 14:17:17 2015 -0800
----------------------------------------------------------------------
.../lib/io/IdempotentStorageManager.java | 3 +-
.../datatorrent/lib/util/WindowDataManager.java | 404 +++++++++++++++++++
.../lib/io/IdempotentStorageManagerTest.java | 1 +
.../lib/util/WindowDataManagerTest.java | 188 +++++++++
4 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index 545398e..dae417d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -54,8 +54,9 @@ import com.datatorrent.common.util.FSStorageAgent;
* application window boundaries.
*
* @since 2.0.0
+ * @deprecated use {@link com.datatorrent.lib.util.WindowDataManager}
*/
-
+@Deprecated
public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext>
{
/**
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
new file mode 100644
index 0000000..26a2e32
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
+ * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
+ *
+ * The order of tuples is guaranteed for ordered input sources.
+ *
+ * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
+ * checkpoints to occur within an application window and checkpoints must be aligned with
+ * application window boundaries.
+ *
+ * @since 2.0.0
+ */
+public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
+{
+ /**
+ * Gets the largest window for which there is recovery data.
+ */
+ long getLargestRecoveryWindow();
+
+ /**
+ * When an operator can partition itself dynamically then there is no guarantee that an input state which was being
+ * handled by one instance previously will be handled by the same instance after partitioning. <br/>
+ * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
+ * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X
+ * is re-hashed to another instance. <br/>
+ * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
+ * operators for the window being replayed and fix it's state.
+ *
+ * @param windowId window id.
+ * @return mapping of operator id to the corresponding state
+ * @throws IOException
+ */
+ Map<Integer, Object> load(long windowId) throws IOException;
+
+ /**
+ * Delete the artifacts of the operator for windows <= windowId.
+ *
+ * @param operatorId operator id
+ * @param windowId window id
+ * @throws IOException
+ */
+ void deleteUpTo(int operatorId, long windowId) throws IOException;
+
+ /**
+ * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
+ * distribute state.
+ *
+ * @param newManagers all the new idempotent storage managers.
+ * @param removedOperatorIds set of operator ids which were removed after partitioning.
+ */
+ void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
+
+ /**
+ * An {@link WindowDataManager} that uses FS to persist state.
+ */
+ class FSWindowDataManager implements WindowDataManager
+ {
+ private static final String DEF_RECOVERY_PATH = "idempotentState";
+
+ protected transient FSStorageAgent storageAgent;
+
+ /**
+ * Recovery path relative to app path where state is saved.
+ */
+ @NotNull
+ private String recoveryPath;
+
+ /**
+ * largest window for which there is recovery data across all physical operator instances.
+ */
+ protected transient long largestRecoveryWindow;
+
+ /**
+ * This is not null only for one physical instance.<br/>
+ * It consists of operator ids which have been deleted but have some state that can be replayed.
+ * Only one of the instances would be handling (modifying) the files that belong to this state.
+ */
+ protected Set<Integer> deletedOperators;
+
+ /**
+ * Sorted mapping from window id to all the operators that have state to replay for that window.
+ */
+ protected final transient TreeMultimap<Long, Integer> replayState;
+
+ protected transient FileSystem fs;
+ protected transient Path appPath;
+
+ public FSWindowDataManager()
+ {
+ replayState = TreeMultimap.create();
+ largestRecoveryWindow = Stateless.WINDOW_ID;
+ recoveryPath = DEF_RECOVERY_PATH;
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ Configuration configuration = new Configuration();
+ appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+
+ try {
+ storageAgent = new FSStorageAgent(appPath.toString(), configuration);
+
+ fs = FileSystem.newInstance(appPath.toUri(), configuration);
+
+ if (fs.exists(appPath)) {
+ FileStatus[] fileStatuses = fs.listStatus(appPath);
+
+ for (FileStatus operatorDirStatus : fileStatuses) {
+ int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
+
+ for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
+ String fileName = status.getPath().getName();
+ if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
+ continue;
+ }
+ long windowId = Long.parseLong(fileName, 16);
+ replayState.put(windowId, operatorId);
+ if (windowId > largestRecoveryWindow) {
+ largestRecoveryWindow = windowId;
+ }
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void save(Object object, int operatorId, long windowId) throws IOException
+ {
+ storageAgent.save(object, operatorId, windowId);
+ }
+
+ @Override
+ public Object load(int operatorId, long windowId) throws IOException
+ {
+ Set<Integer> operators = replayState.get(windowId);
+ if (operators == null || !operators.contains(operatorId)) {
+ return null;
+ }
+ return storageAgent.load(operatorId, windowId);
+ }
+
+ @Override
+ public void delete(int operatorId, long windowId) throws IOException
+ {
+ storageAgent.delete(operatorId, windowId);
+ }
+
+ @Override
+ public Map<Integer, Object> load(long windowId) throws IOException
+ {
+ Set<Integer> operators = replayState.get(windowId);
+ if (operators == null) {
+ return null;
+ }
+ Map<Integer, Object> data = Maps.newHashMap();
+ for (int operatorId : operators) {
+ data.put(operatorId, load(operatorId, windowId));
+ }
+ return data;
+ }
+
+ @Override
+ public long[] getWindowIds(int operatorId) throws IOException
+ {
+ Path operatorPath = new Path(appPath, String.valueOf(operatorId));
+ if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
+ return null;
+ }
+ return storageAgent.getWindowIds(operatorId);
+ }
+
+ /**
+ * This deletes all the recovery files of window ids <= windowId.
+ *
+ * @param operatorId operator id.
+ * @param windowId the largest window id for which the states will be deleted.
+ * @throws IOException
+ */
+ @Override
+ public void deleteUpTo(int operatorId, long windowId) throws IOException
+ {
+ //deleting the replay state
+ if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
+ Iterator<Long> windowsIterator = replayState.keySet().iterator();
+ while (windowsIterator.hasNext()) {
+ long lwindow = windowsIterator.next();
+ if (lwindow > windowId) {
+ break;
+ }
+ for (Integer loperator : replayState.removeAll(lwindow)) {
+
+ if (deletedOperators.contains(loperator)) {
+ storageAgent.delete(loperator, lwindow);
+
+ Path loperatorPath = new Path(appPath, Integer.toString(loperator));
+ if (fs.listStatus(loperatorPath).length == 0) {
+ //The operator was deleted and it has nothing to replay.
+ deletedOperators.remove(loperator);
+ fs.delete(loperatorPath, true);
+ }
+ } else if (loperator == operatorId) {
+ storageAgent.delete(loperator, lwindow);
+ }
+ }
+ }
+ }
+
+ if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
+ long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
+ Arrays.sort(windowsAfterReplay);
+ for (long lwindow : windowsAfterReplay) {
+ if (lwindow <= windowId) {
+ storageAgent.delete(operatorId, lwindow);
+ }
+ }
+ }
+ }
+
+ @Override
+ public long getLargestRecoveryWindow()
+ {
+ return largestRecoveryWindow;
+ }
+
+ @Override
+ public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+ {
+ Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
+ "there has to be one idempotent storage manager");
+ FSWindowDataManager deletedOperatorsManager = null;
+
+ if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
+ if (this.deletedOperators == null) {
+ this.deletedOperators = Sets.newHashSet();
+ }
+ this.deletedOperators.addAll(removedOperatorIds);
+ }
+
+ for (WindowDataManager storageManager : newManagers) {
+
+ FSWindowDataManager lmanager = (FSWindowDataManager)storageManager;
+ lmanager.recoveryPath = this.recoveryPath;
+ lmanager.storageAgent = this.storageAgent;
+
+ if (lmanager.deletedOperators != null) {
+ deletedOperatorsManager = lmanager;
+ }
+ //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
+ if (lmanager != deletedOperatorsManager) {
+ lmanager.deletedOperators = null;
+ }
+ }
+
+ if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
+ //Nothing to do
+ return;
+ }
+ if (this.deletedOperators != null) {
+
+ /*If some operators were removed then there needs to be a manager which can clean there state when it is not
+ needed.*/
+ if (deletedOperatorsManager == null) {
+ //None of the managers were handling deleted operators data.
+ deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next();
+ deletedOperatorsManager.deletedOperators = Sets.newHashSet();
+ }
+
+ deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
+ }
+ }
+
+ @Override
+ public void teardown()
+ {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public String getRecoveryPath()
+ {
+ return recoveryPath;
+ }
+
+ public void setRecoveryPath(String recoveryPath)
+ {
+ this.recoveryPath = recoveryPath;
+ }
+ }
+
+ /**
+ * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
+ * can use the same logic for maintaining idempotency and avoiding idempotency.
+ */
+ class NoopWindowDataManager implements WindowDataManager
+ {
+ @Override
+ public long getLargestRecoveryWindow()
+ {
+ return Stateless.WINDOW_ID;
+ }
+
+ @Override
+ public Map<Integer, Object> load(long windowId) throws IOException
+ {
+ return null;
+ }
+
+ @Override
+ public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+ {
+ }
+
+ @Override
+ public void setup(Context.OperatorContext context)
+ {
+ }
+
+ @Override
+ public void teardown()
+ {
+ }
+
+ @Override
+ public void save(Object object, int operatorId, long windowId) throws IOException
+ {
+ }
+
+ @Override
+ public Object load(int operatorId, long windowId) throws IOException
+ {
+ return null;
+ }
+
+ @Override
+ public void delete(int operatorId, long windowId) throws IOException
+ {
+ }
+
+ @Override
+ public void deleteUpTo(int operatorId, long windowId) throws IOException
+ {
+ }
+
+ @Override
+ public long[] getWindowIds(int operatorId) throws IOException
+ {
+ return new long[0];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index f2461e6..347dabf 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -46,6 +46,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
/**
* Tests for {@link IdempotentStorageManager}
*/
+@Deprecated
public class IdempotentStorageManagerTest
{
private static class TestMeta extends TestWatcher
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
new file mode 100644
index 0000000..fdca73e
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+/**
+ * Tests for {@link WindowDataManager}
+ */
+public class WindowDataManagerTest
+{
+ private static class TestMeta extends TestWatcher
+ {
+
+ String applicationPath;
+ WindowDataManager.FSWindowDataManager storageManager;
+ Context.OperatorContext context;
+
+ @Override
+ protected void starting(Description description)
+ {
+ super.starting(description);
+ storageManager = new WindowDataManager.FSWindowDataManager();
+ applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+ Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+ attributes.put(DAG.APPLICATION_PATH, applicationPath);
+ context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+ storageManager.setup(context);
+ }
+
+ @Override
+ protected void finished(Description description)
+ {
+ storageManager.teardown();
+ try {
+ FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ @Rule
+ public TestMeta testMeta = new TestMeta();
+
+ @Test
+ public void testLargestRecoveryWindow()
+ {
+ Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
+ }
+
+ @Test
+ public void testSave() throws IOException
+ {
+ Map<Integer, String> data = Maps.newHashMap();
+ data.put(1, "one");
+ data.put(2, "two");
+ data.put(3, "three");
+ testMeta.storageManager.save(data, 1, 1);
+ testMeta.storageManager.setup(testMeta.context);
+ @SuppressWarnings("unchecked")
+ Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
+ Assert.assertEquals("dataOf1", data, decoded);
+ }
+
+ @Test
+ public void testLoad() throws IOException
+ {
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ testMeta.storageManager.save(dataOf1, 1, 1);
+ testMeta.storageManager.save(dataOf2, 2, 1);
+ testMeta.storageManager.setup(testMeta.context);
+ Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
+ Assert.assertEquals("no of states", 2, decodedStates.size());
+ for (Integer operatorId : decodedStates.keySet()) {
+ if (operatorId == 1) {
+ Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
+ } else {
+ Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
+ }
+ }
+ }
+
+ @Test
+ public void testRecovery() throws IOException
+ {
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ testMeta.storageManager.save(dataOf1, 1, 1);
+ testMeta.storageManager.save(dataOf2, 2, 2);
+
+ testMeta.storageManager.setup(testMeta.context);
+ Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
+ }
+
+ @Test
+ public void testDelete() throws IOException
+ {
+ Map<Integer, String> dataOf1 = Maps.newHashMap();
+ dataOf1.put(1, "one");
+ dataOf1.put(2, "two");
+ dataOf1.put(3, "three");
+
+ Map<Integer, String> dataOf2 = Maps.newHashMap();
+ dataOf2.put(4, "four");
+ dataOf2.put(5, "five");
+ dataOf2.put(6, "six");
+
+ Map<Integer, String> dataOf3 = Maps.newHashMap();
+ dataOf2.put(7, "seven");
+ dataOf2.put(8, "eight");
+ dataOf2.put(9, "nine");
+
+ testMeta.storageManager.save(dataOf1, 1, 1);
+ testMeta.storageManager.save(dataOf2, 2, 1);
+ testMeta.storageManager.save(dataOf3, 3, 1);
+
+ testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
+ Sets.newHashSet(2, 3));
+ testMeta.storageManager.setup(testMeta.context);
+ testMeta.storageManager.deleteUpTo(1, 1);
+
+ Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
+ FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
+ Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+ Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
+ Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
+ }
+
+}
[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1902' into
devel-3
Posted by br...@apache.org.
Merge branch 'MLHR-1902' into devel-3
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a4b999de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a4b999de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a4b999de
Branch: refs/heads/devel-3
Commit: a4b999de10fb033a8077b3dbb331fafef056c314
Parents: 2f97e7a dae30a0
Author: bright <br...@bright-mac.local>
Authored: Wed Nov 18 15:04:42 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Wed Nov 18 15:04:42 2015 -0800
----------------------------------------------------------------------
.../lib/io/IdempotentStorageManager.java | 3 +-
.../datatorrent/lib/util/WindowDataManager.java | 404 +++++++++++++++++++
.../lib/io/IdempotentStorageManagerTest.java | 1 +
.../lib/util/WindowDataManagerTest.java | 188 +++++++++
4 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------