You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by ti...@apache.org on 2016/03/29 01:36:56 UTC

[1/2] incubator-apex-malhar git commit: APEXMALHAR-2031 enable WindowDataManager to save window files under user configured path

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/master c5cab8bd5 -> c3f6951a5


APEXMALHAR-2031 enable WindowDataManager to save window files under user configured path


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/dec419fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/dec419fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/dec419fc

Branch: refs/heads/master
Commit: dec419fc2ddbd4a82598a255c25cdf774d0c0efc
Parents: c5cab8b
Author: Chandni Singh <cs...@apache.org>
Authored: Mon Mar 28 15:57:19 2016 -0700
Committer: Chandni Singh <cs...@apache.org>
Committed: Mon Mar 28 16:14:59 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/util/WindowDataManager.java |  35 ++-
 .../lib/util/FSWindowDataManagerTest.java       | 216 +++++++++++++++++++
 .../com/datatorrent/lib/util/TestUtils.java     |  22 +-
 .../lib/util/WindowDataManagerTest.java         | 200 -----------------
 4 files changed, 267 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
index 7517cd4..9930d7e 100644
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -113,6 +113,8 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
     @NotNull
     private String recoveryPath;
 
+    private boolean isRecoveryPathRelativeToAppPath = true;
+
     /**
      * largest window for which there is recovery data across all physical operator instances.
      */
@@ -144,7 +146,11 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
     public void setup(Context.OperatorContext context)
     {
       Configuration configuration = new Configuration();
-      appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+      if (isRecoveryPathRelativeToAppPath) {
+        appPath = new Path(context.getValue(DAG.APPLICATION_PATH) + Path.SEPARATOR + recoveryPath);
+      } else {
+        appPath = new Path(recoveryPath);
+      }
 
       try {
         storageAgent = new FSStorageAgent(appPath.toString(), configuration);
@@ -333,15 +339,42 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
       }
     }
 
+    /**
+     * @return recovery path
+     */
     public String getRecoveryPath()
     {
       return recoveryPath;
     }
 
+    /**
+     * Sets the recovery path. If {@link #isRecoveryPathRelativeToAppPath} is true then this path is handled relative
+     * to the application path; otherwise it is handled as an absolute path.
+     *
+     * @param recoveryPath recovery path
+     */
     public void setRecoveryPath(String recoveryPath)
     {
       this.recoveryPath = recoveryPath;
     }
+
+    /**
+     * @return true if recovery path is relative to app path; false otherwise.
+     */
+    public boolean isRecoveryPathRelativeToAppPath()
+    {
+      return isRecoveryPathRelativeToAppPath;
+    }
+
+    /**
+     * Specifies whether the recovery path is relative to application path.
+     *
+     * @param recoveryPathRelativeToAppPath true if recovery path is relative to application path; false otherwise.
+     */
+    public void setRecoveryPathRelativeToAppPath(boolean recoveryPathRelativeToAppPath)
+    {
+      isRecoveryPathRelativeToAppPath = recoveryPathRelativeToAppPath;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
new file mode 100644
index 0000000..26996e7
--- /dev/null
+++ b/library/src/test/java/com/datatorrent/lib/util/FSWindowDataManagerTest.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.lib.util;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Attribute;
+import com.datatorrent.api.Context;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.lib.helper.OperatorContextTestHelper;
+
+/**
+ * Tests for {@link WindowDataManager}
+ */
+public class FSWindowDataManagerTest
+{
+  private static class TestMeta extends TestWatcher
+  {
+
+    String applicationPath;
+    WindowDataManager.FSWindowDataManager storageManager;
+    Context.OperatorContext context;
+
+    @Override
+    protected void starting(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+      super.starting(description);
+      storageManager = new WindowDataManager.FSWindowDataManager();
+      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
+
+      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
+      attributes.put(DAG.APPLICATION_PATH, applicationPath);
+      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
+    }
+
+    @Override
+    protected void finished(Description description)
+    {
+      TestUtils.deleteTargetTestClassFolder(description);
+    }
+  }
+
+  @Rule
+  public TestMeta testMeta = new TestMeta();
+
+  @Test
+  public void testLargestRecoveryWindow()
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testSave() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> data = Maps.newHashMap();
+    data.put(1, "one");
+    data.put(2, "two");
+    data.put(3, "three");
+    testMeta.storageManager.save(data, 1, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    @SuppressWarnings("unchecked")
+    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
+    Assert.assertEquals("dataOf1", data, decoded);
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testLoad() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
+    Assert.assertEquals("no of states", 2, decodedStates.size());
+    for (Integer operatorId : decodedStates.keySet()) {
+      if (operatorId == 1) {
+        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
+      } else {
+        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
+      }
+    }
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testRecovery() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    testMeta.storageManager.save(dataOf1, 1, 1);
+    testMeta.storageManager.save(dataOf2, 2, 2);
+
+    testMeta.storageManager.setup(testMeta.context);
+    Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testDelete() throws IOException
+  {
+    testMeta.storageManager.setup(testMeta.context);
+    Map<Integer, String> dataOf1 = Maps.newHashMap();
+    dataOf1.put(1, "one");
+    dataOf1.put(2, "two");
+    dataOf1.put(3, "three");
+
+    Map<Integer, String> dataOf2 = Maps.newHashMap();
+    dataOf2.put(4, "four");
+    dataOf2.put(5, "five");
+    dataOf2.put(6, "six");
+
+    Map<Integer, String> dataOf3 = Maps.newHashMap();
+    dataOf2.put(7, "seven");
+    dataOf2.put(8, "eight");
+    dataOf2.put(9, "nine");
+
+    for (int i = 1; i <= 9; ++i) {
+      testMeta.storageManager.save(dataOf1, 1, i);
+    }
+
+    testMeta.storageManager.save(dataOf2, 2, 1);
+    testMeta.storageManager.save(dataOf3, 3, 1);
+
+    testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
+        Sets.newHashSet(2, 3));
+    testMeta.storageManager.setup(testMeta.context);
+    testMeta.storageManager.deleteUpTo(1, 6);
+
+    Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
+    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
+    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+    TreeSet<String> windows = Sets.newTreeSet();
+    for (FileStatus fileStatus : fileStatuses) {
+      windows.add(fileStatus.getPath().getName());
+    }
+    Assert.assertEquals("window list for 1", Sets.newTreeSet(Arrays.asList("7", "8", "9")), windows);
+    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
+    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
+    testMeta.storageManager.teardown();
+  }
+
+  @Test
+  public void testAbsoluteRecoveryPath() throws IOException
+  {
+    testMeta.storageManager.setRecoveryPathRelativeToAppPath(false);
+    long time = System.currentTimeMillis();
+    testMeta.storageManager.setRecoveryPath("target/" + time);
+    testSave();
+    File recoveryDir = new File("target/" + time);
+    Assert.assertTrue("recover path exist", recoveryDir.isDirectory());
+    FileUtils.deleteDirectory(recoveryDir);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
index 37d55e7..37aa7e7 100644
--- a/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
+++ b/library/src/test/java/com/datatorrent/lib/util/TestUtils.java
@@ -18,18 +18,21 @@
  */
 package com.datatorrent.lib.util;
 
-import java.io.ByteArrayOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.util.List;
 
 import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
+import org.apache.commons.io.FileUtils;
 
-import com.datatorrent.api.*;
+import com.datatorrent.api.DefaultPartition;
+import com.datatorrent.api.Operator;
 import com.datatorrent.api.Operator.OutputPort;
+import com.datatorrent.api.Sink;
+import com.datatorrent.api.Stats;
+import com.datatorrent.api.StatsListener;
 
 public class TestUtils
 {
@@ -51,6 +54,15 @@ public class TestUtils
     }
   }
 
+  public static void deleteTargetTestClassFolder(Description description)
+  {
+    try {
+      FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public static <S extends Sink, T> S setSink(OutputPort<T> port, S sink)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/dec419fc/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
deleted file mode 100644
index 845b992..0000000
--- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
+++ /dev/null
@@ -1,200 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package com.datatorrent.lib.util;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Map;
-import java.util.TreeSet;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.annotation.Stateless;
-import com.datatorrent.lib.helper.OperatorContextTestHelper;
-
-/**
- * Tests for {@link WindowDataManager}
- */
-public class WindowDataManagerTest
-{
-  private static class TestMeta extends TestWatcher
-  {
-
-    String applicationPath;
-    WindowDataManager.FSWindowDataManager storageManager;
-    Context.OperatorContext context;
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      storageManager = new WindowDataManager.FSWindowDataManager();
-      applicationPath = "target/" + description.getClassName() + "/" + description.getMethodName();
-
-      Attribute.AttributeMap.DefaultAttributeMap attributes = new Attribute.AttributeMap.DefaultAttributeMap();
-      attributes.put(DAG.APPLICATION_PATH, applicationPath);
-      context = new OperatorContextTestHelper.TestIdOperatorContext(1, attributes);
-
-      storageManager.setup(context);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      storageManager.teardown();
-      try {
-        FileUtils.deleteDirectory(new File("target/" + description.getClassName()));
-      } catch (IOException e) {
-        throw new RuntimeException(e);
-      }
-    }
-  }
-
-  @Rule
-  public TestMeta testMeta = new TestMeta();
-
-  @Test
-  public void testLargestRecoveryWindow()
-  {
-    Assert.assertEquals("largest recovery", Stateless.WINDOW_ID, testMeta.storageManager.getLargestRecoveryWindow());
-  }
-
-  @Test
-  public void testSave() throws IOException
-  {
-    Map<Integer, String> data = Maps.newHashMap();
-    data.put(1, "one");
-    data.put(2, "two");
-    data.put(3, "three");
-    testMeta.storageManager.save(data, 1, 1);
-    testMeta.storageManager.setup(testMeta.context);
-    @SuppressWarnings("unchecked")
-    Map<Integer, String> decoded = (Map<Integer, String>)testMeta.storageManager.load(1, 1);
-    Assert.assertEquals("dataOf1", data, decoded);
-  }
-
-  @Test
-  public void testLoad() throws IOException
-  {
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.setup(testMeta.context);
-    Map<Integer, Object> decodedStates = testMeta.storageManager.load(1);
-    Assert.assertEquals("no of states", 2, decodedStates.size());
-    for (Integer operatorId : decodedStates.keySet()) {
-      if (operatorId == 1) {
-        Assert.assertEquals("data of 1", dataOf1, decodedStates.get(1));
-      } else {
-        Assert.assertEquals("data of 2", dataOf2, decodedStates.get(2));
-      }
-    }
-  }
-
-  @Test
-  public void testRecovery() throws IOException
-  {
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    testMeta.storageManager.save(dataOf1, 1, 1);
-    testMeta.storageManager.save(dataOf2, 2, 2);
-
-    testMeta.storageManager.setup(testMeta.context);
-    Assert.assertEquals("largest recovery window", 2, testMeta.storageManager.getLargestRecoveryWindow());
-  }
-
-  @Test
-  public void testDelete() throws IOException
-  {
-    Map<Integer, String> dataOf1 = Maps.newHashMap();
-    dataOf1.put(1, "one");
-    dataOf1.put(2, "two");
-    dataOf1.put(3, "three");
-
-    Map<Integer, String> dataOf2 = Maps.newHashMap();
-    dataOf2.put(4, "four");
-    dataOf2.put(5, "five");
-    dataOf2.put(6, "six");
-
-    Map<Integer, String> dataOf3 = Maps.newHashMap();
-    dataOf2.put(7, "seven");
-    dataOf2.put(8, "eight");
-    dataOf2.put(9, "nine");
-
-    for (int i = 1; i <= 9; ++i) {
-      testMeta.storageManager.save(dataOf1, 1, i);
-    }
-
-    testMeta.storageManager.save(dataOf2, 2, 1);
-    testMeta.storageManager.save(dataOf3, 3, 1);
-
-    testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
-        Sets.newHashSet(2, 3));
-    testMeta.storageManager.setup(testMeta.context);
-    testMeta.storageManager.deleteUpTo(1, 6);
-
-    Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
-    FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
-    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
-    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
-    TreeSet<String> windows = Sets.newTreeSet();
-    for (FileStatus fileStatus : fileStatuses) {
-      windows.add(fileStatus.getPath().getName());
-    }
-    Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
-    Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
-    Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
-  }
-
-}


[2/2] incubator-apex-malhar git commit: Merge branch 'APEXMALHAR-2031'

Posted by ti...@apache.org.
Merge branch 'APEXMALHAR-2031'


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/c3f6951a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/c3f6951a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/c3f6951a

Branch: refs/heads/master
Commit: c3f6951a5466f822debbd5cd700ed72f4f48155c
Parents: c5cab8b dec419f
Author: Timothy Farkas <ti...@datatorrent.com>
Authored: Mon Mar 28 16:36:36 2016 -0700
Committer: Timothy Farkas <ti...@datatorrent.com>
Committed: Mon Mar 28 16:36:36 2016 -0700

----------------------------------------------------------------------
 .../datatorrent/lib/util/WindowDataManager.java |  35 ++-
 .../lib/util/FSWindowDataManagerTest.java       | 216 +++++++++++++++++++
 .../com/datatorrent/lib/util/TestUtils.java     |  22 +-
 .../lib/util/WindowDataManagerTest.java         | 200 -----------------
 4 files changed, 267 insertions(+), 206 deletions(-)
----------------------------------------------------------------------