You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@apex.apache.org by vr...@apache.org on 2017/07/15 15:50:34 UTC

[apex-core] branch master updated: APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring

This is an automated email from the ASF dual-hosted git repository.

vrozov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/apex-core.git


The following commit(s) were added to refs/heads/master by this push:
     new 3a1017c  APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring
3a1017c is described below

commit 3a1017c5af995d7f7d7781ff9daeb293059eda8a
Author: priya <pr...@apache.org>
AuthorDate: Mon Jul 10 16:41:04 2017 +0530

    APEXCORE-756: Fix ConcurrentModificationException in GroupingManager and code refactoring
---
 .../stram/StreamingAppMasterService.java           |  4 ++--
 .../stram/StreamingContainerManager.java           |  4 ++--
 .../stram/StreamingContainerParent.java            |  6 +++---
 .../java/com/datatorrent/stram/api/StramEvent.java |  2 +-
 .../events/grouping}/GroupingManager.java          | 24 +++++++++++++---------
 .../events/grouping}/GroupingRequest.java          | 11 +++++++++-
 .../events/grouping}/GroupingManagerTest.java      |  5 +----
 .../events/grouping}/GroupingRequestTest.java      |  4 +---
 8 files changed, 34 insertions(+), 26 deletions(-)

diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
index 762c7cb..5de8288 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingAppMasterService.java
@@ -44,13 +44,13 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
 import org.apache.apex.engine.api.plugin.PluginLocator;
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.DefaultApexPluginDispatcher;
 import org.apache.apex.engine.plugin.loaders.ChainedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.PropertyBasedPluginLocator;
 import org.apache.apex.engine.plugin.loaders.ServiceLoaderBasedPluginLocator;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.lang3.tuple.MutablePair;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
index 8d2406f..07641d2 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerManager.java
@@ -66,11 +66,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 import org.apache.apex.engine.plugin.ApexPluginDispatcher;
 import org.apache.apex.engine.plugin.NoOpApexPluginDispatcher;
 import org.apache.apex.engine.util.CascadeStorageAgent;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.builder.ToStringBuilder;
diff --git a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
index 8401931..af571ab 100644
--- a/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
+++ b/engine/src/main/java/com/datatorrent/stram/StreamingContainerParent.java
@@ -25,10 +25,10 @@ import java.util.Collections;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.apex.engine.events.grouping.GroupingManager;
+import org.apache.apex.engine.events.grouping.GroupingRequest;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 import org.apache.apex.log.LogFileInformation;
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.ipc.ProtocolSignature;
diff --git a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
index 6224856..b73717f 100644
--- a/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
+++ b/engine/src/main/java/com/datatorrent/stram/api/StramEvent.java
@@ -20,8 +20,8 @@ package com.datatorrent.stram.api;
 
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 import org.apache.apex.log.LogFileInformation;
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
 
 import com.datatorrent.stram.plan.logical.requests.LogicalPlanRequest;
 
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
similarity index 91%
rename from engine/src/main/java/org/apache/apex/stram/GroupingManager.java
rename to engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
index b160dd6..fc5c1d6 100644
--- a/engine/src/main/java/org/apache/apex/stram/GroupingManager.java
+++ b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingManager.java
@@ -16,8 +16,9 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.stram;
+package org.apache.apex.engine.events.grouping;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
@@ -25,7 +26,7 @@ import java.util.Set;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.apex.stram.GroupingRequest.EventGroupId;
+import org.apache.apex.engine.events.grouping.GroupingRequest.EventGroupId;
 
 import com.google.common.collect.Maps;
 
@@ -62,7 +63,10 @@ public class GroupingManager
    */
   public GroupingRequest getGroupingRequest(String containerId)
   {
-    return groupingRequests.get(containerId);
+    if (containerId != null) {
+      return groupingRequests.get(containerId);
+    }
+    return null;
   }
 
   /**
@@ -75,7 +79,7 @@ public class GroupingManager
   public EventGroupId getEventGroupIdForContainer(String containerId)
   {
     EventGroupId groupId = null;
-    if (groupingRequests.get(containerId) != null) {
+    if (containerId != null && groupingRequests.get(containerId) != null) {
       groupId = groupingRequests.get(containerId).getEventGroupId();
     }
     return groupId;
@@ -153,14 +157,14 @@ public class GroupingManager
    */
   public void removeProcessedGroupingRequests()
   {
-    for (Entry<String, GroupingRequest> request : groupingRequests.entrySet()) {
-      if (request.getValue().getOperatorsToDeploy().size() == 0
-          && request.getValue().getOperatorsToUndeploy().size() == 0) {
-        LOG.info("Removing for :" + request.getKey());
-        groupingRequests.remove(request.getKey());
+    Iterator<Entry<String, GroupingRequest>> itr = groupingRequests.entrySet().iterator();
+    while (itr.hasNext()) {
+      Entry<String, GroupingRequest> entry = itr.next();
+      if (entry.getValue().isProcessed()) {
+        LOG.debug("Removing Grouping request for : {}", entry.getKey());
+        itr.remove();
       }
     }
-
   }
 
   /**
diff --git a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
similarity index 94%
rename from engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
rename to engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
index d107a38..c2f352d 100644
--- a/engine/src/main/java/org/apache/apex/stram/GroupingRequest.java
+++ b/engine/src/main/java/org/apache/apex/engine/events/grouping/GroupingRequest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.apex.stram;
+package org.apache.apex.engine.events.grouping;
 
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -129,6 +129,15 @@ public class GroupingRequest
   }
 
   /**
+   * Checks if request is processed
+   * @return isProcessed
+   */
+  public boolean isProcessed()
+  {
+    return (getOperatorsToDeploy().isEmpty() && getOperatorsToUndeploy().isEmpty());
+  }
+
+  /**
    * EventGroupId is used to club relevant events. Events triggered by common
    * cause are considered as relevant events.
    *
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
similarity index 97%
rename from engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
rename to engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
index 7e33e97..1524bed 100644
--- a/engine/src/test/java/org/apach/apex/stram/GroupingManagerTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingManagerTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apach.apex.stram;
+package org.apache.apex.engine.events.grouping;
 
 import org.junit.After;
 import org.junit.Assert;
@@ -25,9 +25,6 @@ import org.junit.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
-import org.apache.apex.stram.GroupingManager;
-import org.apache.apex.stram.GroupingRequest;
-
 import com.google.common.collect.ImmutableSet;
 
 import com.datatorrent.stram.plan.physical.PTContainer;
diff --git a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
similarity index 96%
rename from engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
rename to engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
index 3417715..46b4b52 100644
--- a/engine/src/test/java/org/apach/apex/stram/GroupingRequestTest.java
+++ b/engine/src/test/java/org/apache/apex/engine/events/grouping/GroupingRequestTest.java
@@ -16,14 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apach.apex.stram;
+package org.apache.apex.engine.events.grouping;
 
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import org.apache.apex.stram.GroupingRequest;
-
 public class GroupingRequestTest
 {
   private GroupingRequest underTest;

-- 
To stop receiving notification emails like this one, please contact
['"commits@apex.apache.org" <co...@apex.apache.org>'].