You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/12 22:00:50 UTC

incubator-apex-malhar git commit: Using iterator to remove entries instead of directly accessing data structure to avoid concurrent modification exceptions

Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/release-3.3 b00ff80f4 -> 6ad18e8ca


Using iterator to remove entries instead of directly accessing data structure to avoid concurrent modification exceptions


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6ad18e8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6ad18e8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6ad18e8c

Branch: refs/heads/release-3.3
Commit: 6ad18e8ca99ffefc064174a3e0bee62ae5805b5d
Parents: b00ff80
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Feb 5 13:19:17 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Feb 12 12:55:29 2016 -0800

----------------------------------------------------------------------
 .../lib/io/IdempotentStorageManager.java        | 25 ++++++++---------
 .../datatorrent/lib/util/WindowDataManager.java | 11 +++++---
 .../lib/io/IdempotentStorageManagerTest.java    | 28 ++++++++++++++------
 .../lib/util/WindowDataManagerTest.java         | 18 ++++++++++---
 4 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index dae417d..4eac924 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -23,25 +23,23 @@ import java.util.*;
 
 import javax.validation.constraints.NotNull;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
 import com.datatorrent.api.Component;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.StorageAgent;
 import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
 import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
 
 /**
  * An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent
@@ -61,6 +59,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
 {
   /**
    * Gets the largest window for which there is recovery data.
+   * @return Returns the window id
    */
   long getLargestRecoveryWindow();
 
@@ -233,13 +232,14 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
     {
       //deleting the replay state
       if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
-        Iterator<Long> windowsIterator = replayState.keySet().iterator();
-        while (windowsIterator.hasNext()) {
-          long lwindow = windowsIterator.next();
+        Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+          long lwindow = windowEntry.getKey();
           if (lwindow > windowId) {
             break;
           }
-          for (Integer loperator : replayState.removeAll(lwindow)) {
+          for (Integer loperator : windowEntry.getValue()) {
 
             if (deletedOperators.contains(loperator)) {
               storageAgent.delete(loperator, lwindow);
@@ -255,6 +255,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
               storageAgent.delete(loperator, lwindow);
             }
           }
+          iterator.remove();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
index 26a2e32..7517cd4 100644
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -61,6 +61,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
 {
   /**
    * Gets the largest window for which there is recovery data.
+   * @return Returns the window id
    */
   long getLargestRecoveryWindow();
 
@@ -232,13 +233,14 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
     {
       //deleting the replay state
       if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
-        Iterator<Long> windowsIterator = replayState.keySet().iterator();
-        while (windowsIterator.hasNext()) {
-          long lwindow = windowsIterator.next();
+        Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+        while (iterator.hasNext()) {
+          Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+          long lwindow = windowEntry.getKey();
           if (lwindow > windowId) {
             break;
           }
-          for (Integer loperator : replayState.removeAll(lwindow)) {
+          for (Integer loperator : windowEntry.getValue()) {
 
             if (deletedOperators.contains(loperator)) {
               storageAgent.delete(loperator, lwindow);
@@ -253,6 +255,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
               storageAgent.delete(loperator, lwindow);
             }
           }
+          iterator.remove();
         }
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index 347dabf..4b29830 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -20,18 +20,22 @@ package com.datatorrent.lib.io;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
+import java.util.TreeSet;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TestWatcher;
 import org.junit.runner.Description;
 
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -40,7 +44,6 @@ import com.datatorrent.api.Attribute;
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.annotation.Stateless;
-
 import com.datatorrent.lib.helper.OperatorContextTestHelper;
 
 /**
@@ -172,18 +175,27 @@ public class IdempotentStorageManagerTest
     dataOf2.put(8, "eight");
     dataOf2.put(9, "nine");
 
-    testMeta.storageManager.save(dataOf1, 1, 1);
+    for (int i = 1; i <= 9; ++i) {
+      testMeta.storageManager.save(dataOf1, 1, i);
+    }
+
     testMeta.storageManager.save(dataOf2, 2, 1);
     testMeta.storageManager.save(dataOf3, 3, 1);
 
     testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager),
       Sets.newHashSet(2, 3));
     testMeta.storageManager.setup(testMeta.context);
-    testMeta.storageManager.deleteUpTo(1, 1);
+    testMeta.storageManager.deleteUpTo(1, 6);
 
     Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.recoveryPath);
     FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
-    Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+    TreeSet<String> windows = Sets.newTreeSet();
+    for (FileStatus fileStatus : fileStatuses) {
+      windows.add(fileStatus.getPath().getName());
+    }
+    Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
     Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
     Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
index fdca73e..845b992 100644
--- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
@@ -20,7 +20,9 @@ package com.datatorrent.lib.util;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Map;
+import java.util.TreeSet;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -30,6 +32,7 @@ import org.junit.runner.Description;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
@@ -169,18 +172,27 @@ public class WindowDataManagerTest
     dataOf2.put(8, "eight");
     dataOf2.put(9, "nine");
 
-    testMeta.storageManager.save(dataOf1, 1, 1);
+    for (int i = 1; i <= 9; ++i) {
+      testMeta.storageManager.save(dataOf1, 1, i);
+    }
+
     testMeta.storageManager.save(dataOf2, 2, 1);
     testMeta.storageManager.save(dataOf3, 3, 1);
 
     testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
         Sets.newHashSet(2, 3));
     testMeta.storageManager.setup(testMeta.context);
-    testMeta.storageManager.deleteUpTo(1, 1);
+    testMeta.storageManager.deleteUpTo(1, 6);
 
     Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
     FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
-    Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+    FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+    Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+    TreeSet<String> windows = Sets.newTreeSet();
+    for (FileStatus fileStatus : fileStatuses) {
+      windows.add(fileStatus.getPath().getName());
+    }
+    Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
     Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
     Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
   }