You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 02:29:10 UTC

incubator-eagle git commit: [MINOR] Add makeSSS and makeSRS in RouteSpec

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 976edcd86 -> f833e9831


[MINOR] Add makeSSS and makeSRS in RouteSpec

- Add makeSSS and makeSRS in RouteSpec

Author: r7raul1984 <ta...@yhd.com>

Closes #699 from r7raul1984/ROUTESPEC-MINOR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f833e983
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f833e983
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f833e983

Branch: refs/heads/master
Commit: f833e9831f473660a084327f67418197b5f00d02
Parents: 976edcd
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Dec 6 10:29:05 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 10:29:05 2016 +0800

----------------------------------------------------------------------
 .../alert/coordination/model/RouterSpec.java    | 25 ++++++++++++++++++++
 .../alert/engine/runner/StreamRouterBolt.java   | 15 ++----------
 2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
index fc13c56..b3877c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
@@ -17,9 +17,13 @@
 package org.apache.eagle.alert.coordination.model;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 
 public class RouterSpec {
@@ -66,6 +70,27 @@ public class RouterSpec {
         this.routerSpecs = routerSpecs;
     }
 
+    public Map<StreamPartition, List<StreamRouterSpec>> makeSRS() {
+        Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
+        this.getRouterSpecs().forEach(t -> {
+            if (!newSRS.containsKey(t.getPartition())) {
+                newSRS.put(t.getPartition(), new ArrayList<>());
+            }
+            newSRS.get(t.getPartition()).add(t);
+        });
+        return newSRS;
+    }
+
+    public Map<StreamPartition, StreamSortSpec> makeSSS() {
+        Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
+        this.getRouterSpecs().forEach(t -> {
+            if (t.getPartition().getSortSpec() != null) {
+                newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
+            }
+        });
+        return newSSS;
+    }
+
     @Override
     public String toString() {
         return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 29ee771..e37b680 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -105,12 +105,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         sanityCheck(spec);
 
         // figure out added, removed, modified StreamSortSpec
-        Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t -> {
-            if (t.getPartition().getSortSpec() != null) {
-                newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
-            }
-        });
+        Map<StreamPartition, StreamSortSpec> newSSS = spec.makeSSS();
 
         Set<StreamPartition> newStreamIds = newSSS.keySet();
         Set<StreamPartition> cachedStreamIds = cachedSSS.keySet();
@@ -138,13 +133,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
         cachedSSS = newSSS;
 
         // figure out added, removed, modified StreamRouterSpec
-        Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
-        spec.getRouterSpecs().forEach(t -> {
-            if (!newSRS.containsKey(t.getPartition())) {
-                newSRS.put(t.getPartition(), new ArrayList<StreamRouterSpec>());
-            }
-            newSRS.get(t.getPartition()).add(t);
-        });
+        Map<StreamPartition, List<StreamRouterSpec>> newSRS = spec.makeSRS();
 
         Set<StreamPartition> newStreamPartitions = newSRS.keySet();
         Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();