You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/07/15 15:50:34 UTC
[apex-core] branch master updated: APEXCORE-756: Fix
ConcurrentModificationException in GroupingManager and code refactoring
This is an automated email from the ASF dual-hosted git repository.
vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git
The following commit(s) were added to refs/heads/master by this push:
new 3a1017c APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring
3a1017c is described below
commit 3a1017c5af995d7f7d7781ff9daeb293059eda8a
Author: priya <pr...@apache.org>
AuthorDate: Mon Jul 10 16:41:04 2017 +0530
APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring
---
.../stram/StreamingAppMasterService.java | 4 ++--
.../stram/StreamingContainerManager.java | 4 ++--
.../stram/StreamingContainerParent.java | 6 +++---
.../java/com/datatorrent/stram/api/StramEvent.java | 2 +-
.../events/grouping}/GroupingManager.java | 24 +++++++++++++---------
.../events/grouping}/GroupingRequest.java | 11 +++++++++-
.../events/grouping}/GroupingManagerTest.java | 5 +----
.../events/grouping}/GroupingRequestTest.java | 4 +---
8 files changed, 34 insertions(+), 26 deletions(-)
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 762c7cb..5de8288 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -44,13 +44,13 @@ import org.slf4j.LoggerFactory;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.apex.engine.api.plugin.PluginLocator;
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.tuple.MutablePair;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 8d2406f..07641d2 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -66,11 +66,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
import org.apache.apex.engine.util.CascadeStorageAgent;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.builder.ToStringBuilder;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index 8401931..af571ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -25,10 +25,10 @@ import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import org.apache.apex.log.LogFileInformation;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.ipc.ProtocolSignature;
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index 6224856..b73717f 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -20,8 +20,8 @@ package com.datatorrent.stram.api;
import java.util.concurrent.atomic.AtomicLong;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import org.apache.apex.log.LogFileInformation;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
similarity index 91%
rename from engine/src/main/java/org/apache/apex/stram/GroupingManager.java
rename to engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
index b160dd6..fc5c1d6 100644
--- a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
+++ b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
@@ -16,8 +16,9 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.stram;
+package org.apache.apex.engine.events.grouping;
+import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -25,7 +26,7 @@ import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
import com.google.common.collect.Maps;
@@ -62,7 +63,10 @@ public class GroupingManager
*/
public GroupingRequest getGroupingRequest(String containerId)
{
- return groupingRequests.get(containerId);
+ if (containerId != null) {
+ return groupingRequests.get(containerId);
+ }
+ return null;
}
/**
@@ -75,7 +79,7 @@ public class GroupingManager
public EventGroupId getEventGroupIdForContainer(String containerId)
{
EventGroupId groupId = null;
- if (groupingRequests.get(containerId) != null) {
+ if (containerId != null && groupingRequests.get(containerId) != null) {
groupId = groupingRequests.get(containerId).getEventGroupId();
}
return groupId;
@@ -153,14 +157,14 @@ public class GroupingManager
*/
public void removeProcessedGroupingRequests()
{
- for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) {
- if (request.getValue().getOperatorsToDeploy().size() == 0
- && request.getValue().getOperatorsToUndeploy().size() == 0) {
- LOG.info("Removing for :" + request.getKey());
- groupingRequests.remove(request.getKey());
+ Iterator<Entry<String, GroupingRequest>> itr = groupingRequests.entrySet().iterator();
+ while (itr.hasNext()) {
+ Entry<String, GroupingRequest> entry = itr.next();
+ if (entry.getValue().isProcessed()) {
+ LOG.debug("Removing Grouping request for : {}", entry.getKey());
+ itr.remove();
}
}
-
}
/**
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
similarity index 94%
rename from engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
rename to engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
index d107a38..c2f352d 100644
--- a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
+++ b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.apex.stram;
+package org.apache.apex.engine.events.grouping;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
@@ -129,6 +129,15 @@ public class GroupingRequest
}
/**
+ * Checks if request is processed
+ * @return isProcessed
+ */
+ public boolean isProcessed()
+ {
+ return (getOperatorsToDeploy().isEmpty() && getOperatorsToUndeploy().isEmpty());
+ }
+
+ /**
* EventGroupId is used to club relevant events. Events triggered by common
* cause are considered as relevant events.
*
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
similarity index 97%
rename from engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
rename to engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
index 7e33e97..1524bed 100644
--- a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apach.apex.stram;
+package org.apache.apex.engine.events.grouping;
import org.junit.After;
import org.junit.Assert;
@@ -25,9 +25,6 @@ import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest;
-
import com.google.common.collect.ImmutableSet;
import com.datatorrent.stram.plan.physical.PTContainer;
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
similarity index 96%
rename from engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
rename to engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
index 3417715..46b4b52 100644
--- a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
@@ -16,14 +16,12 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apach.apex.stram;
+package org.apache.apex.engine.events.grouping;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.apache.apex.stram.GroupingRequest;
-
public class GroupingRequestTest
{
private GroupingRequest underTest;
--
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].