You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/06 02:29:10 UTC
incubator-eagle git commit: [MINOR] Add makeSSS and makeSRS in
RouteSpec
Repository: incubator-eagle
Updated Branches:
refs/heads/master 976edcd86 -> f833e9831
[MINOR] Add makeSSS and makeSRS in RouteSpec
- Add makeSSS and makeSRS in RouteSpec
Author: r7raul1984 <ta...@yhd.com>
Closes #699 from r7raul1984/ROUTESPEC-MINOR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f833e983
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f833e983
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f833e983
Branch: refs/heads/master
Commit: f833e9831f473660a084327f67418197b5f00d02
Parents: 976edcd
Author: r7raul1984 <ta...@yhd.com>
Authored: Tue Dec 6 10:29:05 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Tue Dec 6 10:29:05 2016 +0800
----------------------------------------------------------------------
.../alert/coordination/model/RouterSpec.java | 25 ++++++++++++++++++++
.../alert/engine/runner/StreamRouterBolt.java | 15 ++----------
2 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
index fc13c56..b3877c3 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/RouterSpec.java
@@ -17,9 +17,13 @@
package org.apache.eagle.alert.coordination.model;
import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class RouterSpec {
@@ -66,6 +70,27 @@ public class RouterSpec {
this.routerSpecs = routerSpecs;
}
+ public Map<StreamPartition, List<StreamRouterSpec>> makeSRS() {
+ Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
+ this.getRouterSpecs().forEach(t -> {
+ if (!newSRS.containsKey(t.getPartition())) {
+ newSRS.put(t.getPartition(), new ArrayList<>());
+ }
+ newSRS.get(t.getPartition()).add(t);
+ });
+ return newSRS;
+ }
+
+ public Map<StreamPartition, StreamSortSpec> makeSSS() {
+ Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
+ this.getRouterSpecs().forEach(t -> {
+ if (t.getPartition().getSortSpec() != null) {
+ newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
+ }
+ });
+ return newSSS;
+ }
+
@Override
public String toString() {
return String.format("version:%s-topo:%s, boltSpec:%s", version, topologyName, routerSpecs);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f833e983/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
index 29ee771..e37b680 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/StreamRouterBolt.java
@@ -105,12 +105,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
sanityCheck(spec);
// figure out added, removed, modified StreamSortSpec
- Map<StreamPartition, StreamSortSpec> newSSS = new HashMap<>();
- spec.getRouterSpecs().forEach(t -> {
- if (t.getPartition().getSortSpec() != null) {
- newSSS.put(t.getPartition(), t.getPartition().getSortSpec());
- }
- });
+ Map<StreamPartition, StreamSortSpec> newSSS = spec.makeSSS();
Set<StreamPartition> newStreamIds = newSSS.keySet();
Set<StreamPartition> cachedStreamIds = cachedSSS.keySet();
@@ -138,13 +133,7 @@ public class StreamRouterBolt extends AbstractStreamBolt implements StreamRouter
cachedSSS = newSSS;
// figure out added, removed, modified StreamRouterSpec
- Map<StreamPartition, List<StreamRouterSpec>> newSRS = new HashMap<>();
- spec.getRouterSpecs().forEach(t -> {
- if (!newSRS.containsKey(t.getPartition())) {
- newSRS.put(t.getPartition(), new ArrayList<StreamRouterSpec>());
- }
- newSRS.get(t.getPartition()).add(t);
- });
+ Map<StreamPartition, List<StreamRouterSpec>> newSRS = spec.makeSRS();
Set<StreamPartition> newStreamPartitions = newSRS.keySet();
Set<StreamPartition> cachedStreamPartitions = cachedSRS.keySet();