You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by th...@apache.org on 2016/02/12 22:00:50 UTC
incubator-apex-malhar git commit: Using iterator to remove entries
instead of directly accessing data structure to avoid concurrent modification
exceptions
Repository: incubator-apex-malhar
Updated Branches:
refs/heads/release-3.3 b00ff80f4 -> 6ad18e8ca
Using iterator to remove entries instead of directly accessing data structure to avoid concurrent modification exceptions
Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/6ad18e8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/6ad18e8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/6ad18e8c
Branch: refs/heads/release-3.3
Commit: 6ad18e8ca99ffefc064174a3e0bee62ae5805b5d
Parents: b00ff80
Author: Pramod Immaneni <pr...@datatorrent.com>
Authored: Fri Feb 5 13:19:17 2016 -0800
Committer: Thomas Weise <th...@datatorrent.com>
Committed: Fri Feb 12 12:55:29 2016 -0800
----------------------------------------------------------------------
.../lib/io/IdempotentStorageManager.java | 25 ++++++++---------
.../datatorrent/lib/util/WindowDataManager.java | 11 +++++---
.../lib/io/IdempotentStorageManagerTest.java | 28 ++++++++++++++------
.../lib/util/WindowDataManagerTest.java | 18 ++++++++++---
4 files changed, 55 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
index dae417d..4eac924 100644
--- a/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
+++ b/library/src/main/java/com/datatorrent/lib/io/IdempotentStorageManager.java
@@ -23,25 +23,23 @@ import java.util.*;
import javax.validation.constraints.NotNull;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-import com.google.common.collect.TreeMultimap;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import com.google.common.collect.TreeMultimap;
+
import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.StorageAgent;
import com.datatorrent.api.annotation.Stateless;
-
-import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
-
import com.datatorrent.common.util.FSStorageAgent;
+import com.datatorrent.lib.io.fs.AbstractFileInputOperator;
/**
* An idempotent storage manager allows an operator to emit the same tuples in every replayed application window. An idempotent agent
@@ -61,6 +59,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
{
/**
* Gets the largest window for which there is recovery data.
+ * @return Returns the window id
*/
long getLargestRecoveryWindow();
@@ -233,13 +232,14 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
{
//deleting the replay state
if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
- Iterator<Long> windowsIterator = replayState.keySet().iterator();
- while (windowsIterator.hasNext()) {
- long lwindow = windowsIterator.next();
+ Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+ long lwindow = windowEntry.getKey();
if (lwindow > windowId) {
break;
}
- for (Integer loperator : replayState.removeAll(lwindow)) {
+ for (Integer loperator : windowEntry.getValue()) {
if (deletedOperators.contains(loperator)) {
storageAgent.delete(loperator, lwindow);
@@ -255,6 +255,7 @@ public interface IdempotentStorageManager extends StorageAgent, Component<Contex
storageAgent.delete(loperator, lwindow);
}
}
+ iterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
index 26a2e32..7517cd4 100644
--- a/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
+++ b/library/src/main/java/com/datatorrent/lib/util/WindowDataManager.java
@@ -61,6 +61,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
{
/**
* Gets the largest window for which there is recovery data.
+ * @return Returns the window id
*/
long getLargestRecoveryWindow();
@@ -232,13 +233,14 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
{
//deleting the replay state
if (windowId <= largestRecoveryWindow && deletedOperators != null && !deletedOperators.isEmpty()) {
- Iterator<Long> windowsIterator = replayState.keySet().iterator();
- while (windowsIterator.hasNext()) {
- long lwindow = windowsIterator.next();
+ Iterator<Map.Entry<Long, Collection<Integer>>> iterator = replayState.asMap().entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<Long, Collection<Integer>> windowEntry = iterator.next();
+ long lwindow = windowEntry.getKey();
if (lwindow > windowId) {
break;
}
- for (Integer loperator : replayState.removeAll(lwindow)) {
+ for (Integer loperator : windowEntry.getValue()) {
if (deletedOperators.contains(loperator)) {
storageAgent.delete(loperator, lwindow);
@@ -253,6 +255,7 @@ public interface WindowDataManager extends StorageAgent, Component<Context.Opera
storageAgent.delete(loperator, lwindow);
}
}
+ iterator.remove();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
index 347dabf..4b29830 100644
--- a/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/io/IdempotentStorageManagerTest.java
@@ -20,18 +20,22 @@ package com.datatorrent.lib.io;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
+import java.util.TreeSet;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestWatcher;
import org.junit.runner.Description;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@@ -40,7 +44,6 @@ import com.datatorrent.api.Attribute;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.annotation.Stateless;
-
import com.datatorrent.lib.helper.OperatorContextTestHelper;
/**
@@ -172,18 +175,27 @@ public class IdempotentStorageManagerTest
dataOf2.put(8, "eight");
dataOf2.put(9, "nine");
- testMeta.storageManager.save(dataOf1, 1, 1);
+ for (int i = 1; i <= 9; ++i) {
+ testMeta.storageManager.save(dataOf1, 1, i);
+ }
+
testMeta.storageManager.save(dataOf2, 2, 1);
testMeta.storageManager.save(dataOf3, 3, 1);
testMeta.storageManager.partitioned(Lists.<IdempotentStorageManager>newArrayList(testMeta.storageManager),
Sets.newHashSet(2, 3));
testMeta.storageManager.setup(testMeta.context);
- testMeta.storageManager.deleteUpTo(1, 1);
+ testMeta.storageManager.deleteUpTo(1, 6);
Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.recoveryPath);
FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
- Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+ FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+ Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+ TreeSet<String> windows = Sets.newTreeSet();
+ for (FileStatus fileStatus : fileStatuses) {
+ windows.add(fileStatus.getPath().getName());
+ }
+ Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
}
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/6ad18e8c/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
----------------------------------------------------------------------
diff --git a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
index fdca73e..845b992 100644
--- a/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
+++ b/library/src/test/java/com/datatorrent/lib/util/WindowDataManagerTest.java
@@ -20,7 +20,9 @@ package com.datatorrent.lib.util;
import java.io.File;
import java.io.IOException;
+import java.util.Arrays;
import java.util.Map;
+import java.util.TreeSet;
import org.junit.Assert;
import org.junit.Rule;
@@ -30,6 +32,7 @@ import org.junit.runner.Description;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -169,18 +172,27 @@ public class WindowDataManagerTest
dataOf2.put(8, "eight");
dataOf2.put(9, "nine");
- testMeta.storageManager.save(dataOf1, 1, 1);
+ for (int i = 1; i <= 9; ++i) {
+ testMeta.storageManager.save(dataOf1, 1, i);
+ }
+
testMeta.storageManager.save(dataOf2, 2, 1);
testMeta.storageManager.save(dataOf3, 3, 1);
testMeta.storageManager.partitioned(Lists.<WindowDataManager>newArrayList(testMeta.storageManager),
Sets.newHashSet(2, 3));
testMeta.storageManager.setup(testMeta.context);
- testMeta.storageManager.deleteUpTo(1, 1);
+ testMeta.storageManager.deleteUpTo(1, 6);
Path appPath = new Path(testMeta.applicationPath + '/' + testMeta.storageManager.getRecoveryPath());
FileSystem fs = FileSystem.newInstance(appPath.toUri(), new Configuration());
- Assert.assertEquals("no data for 1", 0, fs.listStatus(new Path(appPath, Integer.toString(1))).length);
+ FileStatus[] fileStatuses = fs.listStatus(new Path(appPath, Integer.toString(1)));
+ Assert.assertEquals("number of windows for 1", 3, fileStatuses.length);
+ TreeSet<String> windows = Sets.newTreeSet();
+ for (FileStatus fileStatus : fileStatuses) {
+ windows.add(fileStatus.getPath().getName());
+ }
+ Assert.assertEquals("window list for 1", Sets.newLinkedHashSet(Arrays.asList("7", "8", "9")), windows);
Assert.assertEquals("no data for 2", false, fs.exists(new Path(appPath, Integer.toString(2))));
Assert.assertEquals("no data for 3", false, fs.exists(new Path(appPath, Integer.toString(3))));
}