You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by qi...@apache.org on 2017/02/24 03:50:46 UTC
eagle git commit: [EAGLE-926] Alert engine fails to support more than
one stream consumption from an application
Repository: eagle
Updated Branches:
refs/heads/master 3011fdf84 -> 75586658b
[EAGLE-926] Alert engine fails to support more than one stream consumption from an application
https://issues.apache.org/jira/browse/EAGLE-926
Author: Zhao, Qingwen <qi...@apache.org>
Closes #841 from qingwen220/EAGLE-926.
Project: http://git-wip-us.apache.org/repos/asf/eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/eagle/commit/75586658
Tree: http://git-wip-us.apache.org/repos/asf/eagle/tree/75586658
Diff: http://git-wip-us.apache.org/repos/asf/eagle/diff/75586658
Branch: refs/heads/master
Commit: 75586658ba1aa202ee0ebec8c4cc52546cb0aae2
Parents: 3011fdf
Author: Zhao, Qingwen <qi...@apache.org>
Authored: Fri Feb 24 11:50:35 2017 +0800
Committer: Zhao, Qingwen <qi...@apache.org>
Committed: Fri Feb 24 11:50:35 2017 +0800
----------------------------------------------------------------------
.../engine/coordinator/StreamDefinition.java | 44 +++++++++++++-------
.../alert/engine/runner/AlertPublisherBolt.java | 2 +-
.../eagle/app/service/ApplicationAction.java | 7 ++--
3 files changed, 34 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
index 9512f1a..fd5d5a6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/StreamDefinition.java
@@ -52,6 +52,9 @@ public class StreamDefinition implements Serializable {
// Stream data source ID
private String dataSource;
+ //
+ private String streamSource;
+
// Tenant (Site) ID
private String siteId;
@@ -70,14 +73,15 @@ public class StreamDefinition implements Serializable {
@Override
public int hashCode() {
return new HashCodeBuilder()
- .append(this.streamId)
- .append(this.description)
- .append(this.validate)
- .append(this.timeseries)
- .append(this.dataSource)
- .append(this.siteId)
- .append(this.columns)
- .build();
+ .append(this.streamId)
+ .append(this.description)
+ .append(this.validate)
+ .append(this.timeseries)
+ .append(this.dataSource)
+ .append(streamSource)
+ .append(this.siteId)
+ .append(this.columns)
+ .build();
}
@Override
@@ -88,13 +92,15 @@ public class StreamDefinition implements Serializable {
if (!(obj instanceof StreamDefinition)) {
return false;
}
- return Objects.equals(this.streamId, ((StreamDefinition) obj).streamId)
- && Objects.equals(this.description, ((StreamDefinition) obj).description)
- && Objects.equals(this.validate, ((StreamDefinition) obj).validate)
- && Objects.equals(this.timeseries, ((StreamDefinition) obj).timeseries)
- && Objects.equals(this.dataSource, ((StreamDefinition) obj).dataSource)
- && Objects.equals(this.siteId, ((StreamDefinition) obj).siteId)
- && CollectionUtils.isEqualCollection(this.columns, ((StreamDefinition) obj).columns);
+ StreamDefinition streamDefinition = (StreamDefinition) obj;
+ return Objects.equals(this.streamId, streamDefinition.streamId)
+ && Objects.equals(this.description, streamDefinition.description)
+ && Objects.equals(this.validate, streamDefinition.validate)
+ && Objects.equals(this.timeseries, streamDefinition.timeseries)
+ && Objects.equals(this.dataSource, streamDefinition.dataSource)
+ && Objects.equals(this.streamSource, streamDefinition.streamSource)
+ && Objects.equals(this.siteId, streamDefinition.siteId)
+ && CollectionUtils.isEqualCollection(this.columns, streamDefinition.columns);
}
public String getStreamId() {
@@ -166,6 +172,14 @@ public class StreamDefinition implements Serializable {
this.siteId = siteId;
}
+ public String getStreamSource() {
+ return streamSource;
+ }
+
+ public void setStreamSource(String streamSource) {
+ this.streamSource = streamSource;
+ }
+
public StreamDefinition copy() {
StreamDefinition copied = new StreamDefinition();
copied.setColumns(this.getColumns());
http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 2b57e96..44a5fe9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -189,7 +189,7 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId);
if (sd != null) {
extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId());
- appIds.add(sd.getDataSource());
+ appIds.add(sd.getStreamSource());
}
}
extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
http://git-wip-us.apache.org/repos/asf/eagle/blob/75586658/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index a502f81..3733e4d 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -142,7 +142,7 @@ public class ApplicationAction implements Serializable {
KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSinkConfig();
Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
datasource.setType("KAFKA");
- datasource.setName(metadata.getAppId());
+ datasource.setName(streamDesc.getStreamId());
datasource.setTopic(kafkaCfg.getTopicId());
datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
datasource.setProperties(new HashMap<>());
@@ -164,7 +164,8 @@ public class ApplicationAction implements Serializable {
alertMetadataService.addDataSource(datasource);
StreamDefinition sd = streamDesc.getSchema();
- sd.setDataSource(metadata.getAppId());
+ sd.setDataSource(streamDesc.getStreamId());
+ sd.setStreamSource(metadata.getAppId());
alertMetadataService.createStream(streamDesc.getSchema());
}
}
@@ -177,7 +178,7 @@ public class ApplicationAction implements Serializable {
}
// iterate each stream descriptor and create alert datasource for each
for (StreamDesc streamDesc : metadata.getStreams()) {
- alertMetadataService.removeDataSource(metadata.getAppId());
+ alertMetadataService.removeDataSource(streamDesc.getStreamId());
alertMetadataService.removeStream(streamDesc.getStreamId());
}
}