You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/24 03:50:46 UTC

eagle git commit: [EAGLE-926] Alert engine fails to support more than one stream consumption from an application

Repository: eagle
Updated Branches:
  refs/heads/master 3011fdf84 -> 75586658b


[EAGLE-926] Alert engine fails to support more than one stream consumption from an application

https://issues.apache.org/jira/browse/EAGLE-926

Author: Zhao, Qingwen <qi...@apache.org>

Closes #841 from qingwen220/EAGLE-926.


Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/75586658
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/75586658
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/75586658

Branch: refs/heads/master
Commit: 75586658ba1aa202ee0ebec8c4cc52546cb0aae2
Parents: 3011fdf
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Feb 24 11:50:35 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Feb 24 11:50:35 2017 +0800

----------------------------------------------------------------------
 .../engine/coordinator/StreamDefinition.java    | 44 +++++++++++++-------
 .../alert/engine/runner/AlertPublisherBolt.java |  2 +-
 .../eagle/app/service/ApplicationAction.java    |  7 ++--
 3 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index 9512f1a..fd5d5a6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -52,6 +52,9 @@ public class StreamDefinition implements Serializable {
     // Stream data source ID
     private String dataSource;
 
+    //
+    private String streamSource;
+
     // Tenant (Site) ID
     private String siteId;
 
@@ -70,14 +73,15 @@ public class StreamDefinition implements Serializable {
     @Override
     public int hashCode() {
         return new HashCodeBuilder()
-            .append(this.streamId)
-            .append(this.description)
-            .append(this.validate)
-            .append(this.timeseries)
-            .append(this.dataSource)
-            .append(this.siteId)
-            .append(this.columns)
-            .build();
+                .append(this.streamId)
+                .append(this.description)
+                .append(this.validate)
+                .append(this.timeseries)
+                .append(this.dataSource)
+                .append(streamSource)
+                .append(this.siteId)
+                .append(this.columns)
+                .build();
     }
 
     @Override
@@ -88,13 +92,15 @@ public class StreamDefinition implements Serializable {
         if (!(obj instanceof StreamDefinition)) {
             return false;
         }
-        return Objects.equals(this.streamId, ((StreamDefinition) obj).streamId)
-            && Objects.equals(this.description, ((StreamDefinition) obj).description)
-            && Objects.equals(this.validate, ((StreamDefinition) obj).validate)
-            && Objects.equals(this.timeseries, ((StreamDefinition) obj).timeseries)
-            && Objects.equals(this.dataSource, ((StreamDefinition) obj).dataSource)
-            && Objects.equals(this.siteId, ((StreamDefinition) obj).siteId)
-            && CollectionUtils.isEqualCollection(this.columns, ((StreamDefinition) obj).columns);
+        StreamDefinition streamDefinition = (StreamDefinition) obj;
+        return Objects.equals(this.streamId, streamDefinition.streamId)
+            && Objects.equals(this.description, streamDefinition.description)
+            && Objects.equals(this.validate, streamDefinition.validate)
+            && Objects.equals(this.timeseries, streamDefinition.timeseries)
+            && Objects.equals(this.dataSource, streamDefinition.dataSource)
+                && Objects.equals(this.streamSource, streamDefinition.streamSource)
+            && Objects.equals(this.siteId, streamDefinition.siteId)
+            && CollectionUtils.isEqualCollection(this.columns, streamDefinition.columns);
     }
 
     public String getStreamId() {
@@ -166,6 +172,14 @@ public class StreamDefinition implements Serializable {
         this.siteId = siteId;
     }
 
+    public String getStreamSource() {
+        return streamSource;
+    }
+
+    public void setStreamSource(String streamSource) {
+        this.streamSource = streamSource;
+    }
+
     public StreamDefinition copy() {
         StreamDefinition copied = new StreamDefinition();
         copied.setColumns(this.getColumns());

http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 2b57e96..44a5fe9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -189,7 +189,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
                         StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId);
                         if (sd != null) {
                             extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId());
-                            appIds.add(sd.getDataSource());
+                            appIds.add(sd.getStreamSource());
                         }
                     }
                     extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);

http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index a502f81..3733e4d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -142,7 +142,7 @@ public class ApplicationAction implements Serializable {
                 KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSinkConfig();
                 Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
                 datasource.setType("KAFKA");
-                datasource.setName(metadata.getAppId());
+                datasource.setName(streamDesc.getStreamId());
                 datasource.setTopic(kafkaCfg.getTopicId());
                 datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
                 datasource.setProperties(new HashMap<>());
@@ -164,7 +164,8 @@ public class ApplicationAction implements Serializable {
                 alertMetadataService.addDataSource(datasource);
 
                 StreamDefinition sd = streamDesc.getSchema();
-                sd.setDataSource(metadata.getAppId());
+                sd.setDataSource(streamDesc.getStreamId());
+                sd.setStreamSource(metadata.getAppId());
                 alertMetadataService.createStream(streamDesc.getSchema());
             }
         }
@@ -177,7 +178,7 @@ public class ApplicationAction implements Serializable {
         }
         // iterate each stream descriptor and create alert datasource for each
         for (StreamDesc streamDesc : metadata.getStreams()) {
-            alertMetadataService.removeDataSource(metadata.getAppId());
+            alertMetadataService.removeDataSource(streamDesc.getStreamId());
             alertMetadataService.removeStream(streamDesc.getStreamId());
         }
     }