You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by br...@apache.org on 2015/11/19 00:05:52 UTC

[1/2] incubator-apex-malhar git commit: MLHR-1902 #comment renamed IdempotentStorageManager to WindowDataManager

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 2f97e7a40 -> a4b999de1


MLHR-1902 #comment renamed IdempotentStorageManager to WindowDataManager


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dae30a03
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dae30a03
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dae30a03

Branch: refs/heads/devel-3
Commit: dae30a03e53bebc404a3852c2c8d23c7619c5fd1
Parents: 2f97e7a
Author: Chandni Singh <cs...@apache.org>
Authored: Wed Nov 18 13:26:21 2015 -0800
Committer: Chandni Singh <cs...@apache.org>
Committed: Wed Nov 18 14:17:17 2015 -0800

----------------------------------------------------------------------
 .../lib/io/IdempotentStorageManager.java        |   3 +-
 .../datatorrent/lib/util/WindowDataManager.java | 404 +++++++++++++++++++
 .../lib/io/IdempotentStorageManagerTest.java    |   1 +
 .../lib/util/WindowDataManagerTest.java         | 188 +++++++++
 4 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index 545398e..dae417d 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -54,8 +54,9 @@ import com.datatorrent.common.util.FSStorageAgent;
  * application window boundaries.
  *
  * @since 2.0.0
+ * @deprecated use {@link com.datatorrent.lib.util.WindowDataManager}
  */
-
+@Deprecated
 public interface IdempotentStorageManager extends StorageAgent, Component<Context.OperatorContext>
 {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
new file mode 100644
index 0000000..26a2e32
--- /dev/null
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -0,0 +1,404 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
+import com.datatorrent.api.Component;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.StorageAgent;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
+
+/**
+ * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window.
+ * An idempotent agent cannot make any guarantees about the tuples emitted in the application window which fails.
+ *
+ * The order of tuples is guaranteed for ordered input sources.
+ *
+ * <b>Important:</b> In order for an idempotent storage manager to function correctly it cannot allow
+ * checkpoints to occur within an application window and checkpoints must be aligned with
+ * application window boundaries.
+ *
+ * @since 2.0.0
+ */
+public interface WindowDataManager extends StorageAgent, Component<Context.OperatorContext>
+{
+  /**
+   * Gets the largest window for which there is recovery data.
+   */
+  long getLargestRecoveryWindow();
+
+  /**
+   * When an operator can partition itself dynamically then there is no guarantee that an input state which was being
+   * handled by one instance previously will be handled by the same instance after partitioning. <br/>
+   * For eg. An {@link AbstractFileInputOperator} instance which reads a File X till offset l (not check-pointed) may no
+   * longer be the instance that handles file X after repartitioning as no. of instances may have changed and file X
+   * is re-hashed to another instance. <br/>
+   * The new instance wouldn't know from what point to read the File X unless it reads the idempotent storage of all the
+   * operators for the window being replayed and fix it's state.
+   *
+   * @param windowId window id.
+   * @return mapping of operator id to the corresponding state
+   * @throws IOException
+   */
+  Map<Integer, Object> load(long windowId) throws IOException;
+
+  /**
+   * Delete the artifacts of the operator for windows <= windowId.
+   *
+   * @param operatorId operator id
+   * @param windowId   window id
+   * @throws IOException
+   */
+  void deleteUpTo(int operatorId, long windowId) throws IOException;
+
+  /**
+   * This informs the idempotent storage manager that operator is partitioned so that it can set properties and
+   * distribute state.
+   *
+   * @param newManagers        all the new idempotent storage managers.
+   * @param removedOperatorIds set of operator ids which were removed after partitioning.
+   */
+  void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds);
+
+  /**
+   * An {@link WindowDataManager} that uses FS to persist state.
+   */
+  class FSWindowDataManager implements WindowDataManager
+  {
+    private static final String DEF_RECOVERY_PATH = "idempotentState";
+
+    protected transient FSStorageAgent storageAgent;
+
+    /**
+     * Recovery path relative to app path where state is saved.
+     */
+    @NotNull
+    private String recoveryPath;
+
+    /**
+     * largest window for which there is recovery data across all physical operator instances.
+     */
+    protected transient long largestRecoveryWindow;
+
+    /**
+     * This is not null only for one physical instance.<br/>
+     * It consists of operator ids which have been deleted but have some state that can be replayed.
+     * Only one of the instances would be handling (modifying) the files that belong to this state.
+     */
+    protected Set<Integer> deletedOperators;
+
+    /**
+     * Sorted mapping from window id to all the operators that have state to replay for that window.
+     */
+    protected final transient TreeMultimap<Long, Integer> replayState;
+
+    protected transient FileSystem fs;
+    protected transient Path appPath;
+
+    public FSWindowDataManager()
+    {
+      replayState = TreeMultimap.create();
+      largestRecoveryWindow = Stateless.WINDOW_ID;
+      recoveryPath = DEF_RECOVERY_PATH;
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      Configuration configuration = new Configuration();
+      appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+
+      try {
+        storageAgent = new FSStorageAgent(appPath.toString(), configuration);
+
+        fs = FileSystem.newInstance(appPath.toUri(), configuration);
+
+        if (fs.exists(appPath)) {
+          FileStatus[] fileStatuses = fs.listStatus(appPath);
+
+          for (FileStatus operatorDirStatus : fileStatuses) {
+            int operatorId = Integer.parseInt(operatorDirStatus.getPath().getName());
+
+            for (FileStatus status : fs.listStatus(operatorDirStatus.getPath())) {
+              String fileName = status.getPath().getName();
+              if (fileName.endsWith(FSStorageAgent.TMP_FILE)) {
+                continue;
+              }
+              long windowId = Long.parseLong(fileName, 16);
+              replayState.put(windowId, operatorId);
+              if (windowId > largestRecoveryWindow) {
+                largestRecoveryWindow = windowId;
+              }
+            }
+          }
+        }
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    @Override
+    public void save(Object object, int operatorId, long windowId) throws IOException
+    {
+      storageAgent.save(object, operatorId, windowId);
+    }
+
+    @Override
+    public Object load(int operatorId, long windowId) throws IOException
+    {
+      Set<Integer> operators = replayState.get(windowId);
+      if (operators == null || !operators.contains(operatorId)) {
+        return null;
+      }
+      return storageAgent.load(operatorId, windowId);
+    }
+
+    @Override
+    public void delete(int operatorId, long windowId) throws IOException
+    {
+      storageAgent.delete(operatorId, windowId);
+    }
+
+    @Override
+    public Map<Integer, Object> load(long windowId) throws IOException
+    {
+      Set<Integer> operators = replayState.get(windowId);
+      if (operators == null) {
+        return null;
+      }
+      Map<Integer, Object> data = Maps.newHashMap();
+      for (int operatorId : operators) {
+        data.put(operatorId, load(operatorId, windowId));
+      }
+      return data;
+    }
+
+    @Override
+    public long[] getWindowIds(int operatorId) throws IOException
+    {
+      Path operatorPath = new Path(appPath, String.valueOf(operatorId));
+      if (!fs.exists(operatorPath) || fs.listStatus(operatorPath).length == 0) {
+        return null;
+      }
+      return storageAgent.getWindowIds(operatorId);
+    }
+
+    /**
+     * This deletes all the recovery files of window ids <= windowId.
+     *
+     * @param operatorId operator id.
+     * @param windowId   the largest window id for which the states will be deleted.
+     * @throws IOException
+     */
+    @Override
+    public void deleteUpTo(int operatorId, long windowId) throws IOException
+    {
+      //deleting the replay state
+      if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
+        Iterator<Long> windowsIterator = replayState.keySet().iterator();
+        while (windowsIterator.hasNext()) {
+          long lwindow = windowsIterator.next();
+          if (lwindow > windowId) {
+            break;
+          }
+          for (Integer loperator : replayState.removeAll(lwindow)) {
+
+            if (deletedOperators.contains(loperator)) {
+              storageAgent.delete(loperator, lwindow);
+
+              Path loperatorPath = new Path(appPath, Integer.toString(loperator));
+              if (fs.listStatus(loperatorPath).length == 0) {
+                //The operator was deleted and it has nothing to replay.
+                deletedOperators.remove(loperator);
+                fs.delete(loperatorPath, true);
+              }
+            } else if (loperator == operatorId) {
+              storageAgent.delete(loperator, lwindow);
+            }
+          }
+        }
+      }
+
+      if (fs.listStatus(new Path(appPath, Integer.toString(operatorId))).length > 0) {
+        long[] windowsAfterReplay = storageAgent.getWindowIds(operatorId);
+        Arrays.sort(windowsAfterReplay);
+        for (long lwindow : windowsAfterReplay) {
+          if (lwindow <= windowId) {
+            storageAgent.delete(operatorId, lwindow);
+          }
+        }
+      }
+    }
+
+    @Override
+    public long getLargestRecoveryWindow()
+    {
+      return largestRecoveryWindow;
+    }
+
+    @Override
+    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+    {
+      Preconditions.checkArgument(newManagers != null && !newManagers.isEmpty(),
+          "there has to be one idempotent storage manager");
+      FSWindowDataManager deletedOperatorsManager = null;
+
+      if (removedOperatorIds != null && !removedOperatorIds.isEmpty()) {
+        if (this.deletedOperators == null) {
+          this.deletedOperators = Sets.newHashSet();
+        }
+        this.deletedOperators.addAll(removedOperatorIds);
+      }
+
+      for (WindowDataManager storageManager : newManagers) {
+
+        FSWindowDataManager lmanager = (FSWindowDataManager)storageManager;
+        lmanager.recoveryPath = this.recoveryPath;
+        lmanager.storageAgent = this.storageAgent;
+
+        if (lmanager.deletedOperators != null) {
+          deletedOperatorsManager = lmanager;
+        }
+        //only one physical instance can manage deleted operators so clearing this field for rest of the instances.
+        if (lmanager != deletedOperatorsManager) {
+          lmanager.deletedOperators = null;
+        }
+      }
+
+      if (removedOperatorIds == null || removedOperatorIds.isEmpty()) {
+        //Nothing to do
+        return;
+      }
+      if (this.deletedOperators != null) {
+
+        /*If some operators were removed then there needs to be a manager which can clean there state when it is not
+        needed.*/
+        if (deletedOperatorsManager == null) {
+          //None of the managers were handling deleted operators data.
+          deletedOperatorsManager = (FSWindowDataManager)newManagers.iterator().next();
+          deletedOperatorsManager.deletedOperators = Sets.newHashSet();
+        }
+
+        deletedOperatorsManager.deletedOperators.addAll(removedOperatorIds);
+      }
+    }
+
+    @Override
+    public void teardown()
+    {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+
+    public String getRecoveryPath()
+    {
+      return recoveryPath;
+    }
+
+    public void setRecoveryPath(String recoveryPath)
+    {
+      this.recoveryPath = recoveryPath;
+    }
+  }
+
+  /**
+   * This {@link WindowDataManager} will never do recovery. This is a convenience class so that operators
+   * can use the same logic for maintaining idempotency and avoiding idempotency.
+   */
+  class NoopWindowDataManager implements WindowDataManager
+  {
+    @Override
+    public long getLargestRecoveryWindow()
+    {
+      return Stateless.WINDOW_ID;
+    }
+
+    @Override
+    public Map<Integer, Object> load(long windowId) throws IOException
+    {
+      return null;
+    }
+
+    @Override
+    public void partitioned(Collection<WindowDataManager> newManagers, Set<Integer> removedOperatorIds)
+    {
+    }
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+    }
+
+    @Override
+    public void teardown()
+    {
+    }
+
+    @Override
+    public void save(Object object, int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public Object load(int operatorId, long windowId) throws IOException
+    {
+      return null;
+    }
+
+    @Override
+    public void delete(int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public void deleteUpTo(int operatorId, long windowId) throws IOException
+    {
+    }
+
+    @Override
+    public long[] getWindowIds(int operatorId) throws IOException
+    {
+      return new long[0];
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index f2461e6..347dabf 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -46,6 +46,7 @@ import com.datatorrent.lib.helper.OperatorContextTestHelper;
 /**
  * Tests for {@link IdempotentStorageManager}
  */
+@Deprecated
 public class IdempotentStorageManagerTest
 {
   private static class TestMeta extends TestWatcher

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dae30a03/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
new file mode 100644
index 0000000..fdca73e
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+/**
+ * Tests for {@link WindowDataManager}
+ */
+public class WindowDataManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+
+    String applicationPath;
+    WindowDataManager.FSWindowDataManager storageManager;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      super.starting(description);
+      storageManager = new WindowDataManager.FSWindowDataManager();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+
+      storageManager.setup(context);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      storageManager.teardown();
+      try {
+        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testLargestRecoveryWindow()
+  {
+    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
+  }
+
+  @Test
+  public void testSave() throws IOException
+  {
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    testMeta.storageManager.save(data, 1, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
+    Assert.assertEquals("dataOf1", data, decoded);
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
+    Assert.assertEquals("no of states", 2, decodedStates.size());
+    for (Integer operatorId : decodedStates.keySet()) {
+      if (operatorId == 1) {
+        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
+      } else {
+        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
+      }
+    }
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 2);
+
+    testMeta.storageManager.setup(testMeta.context);
+    Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
+  }
+
+  @Test
+  public void testDelete() throws IOException
+  {
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    Map<Integer, String> dataOf3 = Maps.newHashMap();
+    dataOf2.put(7, "seven");
+    dataOf2.put(8, "eight");
+    dataOf2.put(9, "nine");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.save(dataOf3, 3, 1);
+
+    testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
+        Sets.newHashSet(2, 3));
+    testMeta.storageManager.setup(testMeta.context);
+    testMeta.storageManager.deleteUpTo(1, 1);
+
+    Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
+    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
+    Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
+    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
+  }
+
+}


[2/2] incubator-apex-malhar git commit: Merge branch 'MLHR-1902' into devel-3

Posted by br...@apache.org.
Merge branch 'MLHR-1902' into devel-3


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/a4b999de
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/a4b999de
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/a4b999de

Branch: refs/heads/devel-3
Commit: a4b999de10fb033a8077b3dbb331fafef056c314
Parents: 2f97e7a dae30a0
Author: bright <br...@bright-mac.local>
Authored: Wed Nov 18 15:04:42 2015 -0800
Committer: bright <br...@bright-mac.local>
Committed: Wed Nov 18 15:04:42 2015 -0800

----------------------------------------------------------------------
 .../lib/io/IdempotentStorageManager.java        |   3 +-
 .../datatorrent/lib/util/WindowDataManager.java | 404 +++++++++++++++++++
 .../lib/io/IdempotentStorageManagerTest.java    |   1 +
 .../lib/util/WindowDataManagerTest.java         | 188 +++++++++
 4 files changed, 595 insertions(+), 1 deletion(-)
----------------------------------------------------------------------