You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/29 14:20:40 UTC
incubator-eagle git commit: [EAGLE-693] fix application could not
detect stream change
Repository: incubator-eagle
Updated Branches:
refs/heads/master 4250e2d32 -> 4ff963b47
[EAGLE-693] fix application could not detect stream change
Author: wujinhu <wu...@126.com>
Closes #580 from wujinhu/EAGLE-700.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4ff963b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4ff963b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4ff963b4
Branch: refs/heads/master
Commit: 4ff963b47fd82cb1ca8394027518a9183f39176e
Parents: 4250e2d
Author: wujinhu <wu...@126.com>
Authored: Sat Oct 29 22:20:20 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Sat Oct 29 22:20:20 2016 +0800
----------------------------------------------------------------------
.../eagle/app/service/ApplicationAction.java | 81 +++++++++++---------
.../src/main/bin/createTables.sql | 20 +++++
2 files changed, 65 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index b7258d3..bd0adfe 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -88,45 +88,51 @@ public class ApplicationAction implements Serializable {
}
public void doInstall() {
- if (metadata.getDescriptor().getStreams() != null) {
- List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
- StreamDefinition copied = streamDefinition.copy();
- copied.setSiteId(metadata.getSite().getSiteId());
- copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
- StreamSinkConfig streamSinkConfig = this.runtime.environment()
+ processStreams();
+ }
+
+ private void processStreams() {
+ if (metadata.getDescriptor().getStreams() == null) {
+ return;
+ }
+
+ List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+ StreamDefinition copied = streamDefinition.copy();
+ copied.setSiteId(metadata.getSite().getSiteId());
+ copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
+ StreamSinkConfig streamSinkConfig = this.runtime.environment()
.streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
- StreamDesc streamDesc = new StreamDesc();
- streamDesc.setSchema(copied);
- streamDesc.setSink(streamSinkConfig);
- streamDesc.setStreamId(copied.getStreamId());
- return streamDesc;
- })).collect(Collectors.toList());
- metadata.setStreams(streamDescToInstall);
+ StreamDesc streamDesc = new StreamDesc();
+ streamDesc.setSchema(copied);
+ streamDesc.setSink(streamSinkConfig);
+ streamDesc.setStreamId(copied.getStreamId());
+ return streamDesc;
+ })).collect(Collectors.toList());
+ metadata.setStreams(streamDescToInstall);
- // TODO: Decouple converting from StreamSink to Alert DataSource
- // iterate each stream descriptor and create alert datasource for each
- for (StreamDesc streamDesc : streamDescToInstall) {
- // only take care of Kafka sink
- if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
- KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
- Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
- datasource.setType("KAFKA");
- datasource.setName(metadata.getAppId());
- datasource.setTopic(kafkaCfg.getTopicId());
- datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
- Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
- Properties prop = new Properties();
- prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId());
- tuple2Stream.setStreamNameSelectorProp(prop);
- tuple2Stream.setTimestampColumn("timestamp");
- tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
- datasource.setCodec(tuple2Stream);
- alertMetadataService.addDataSource(datasource);
+ // TODO: Decouple converting from StreamSink to Alert DataSource
+ // iterate each stream descriptor and create alert datasource for each
+ for (StreamDesc streamDesc : streamDescToInstall) {
+ // only take care of Kafka sink
+ if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
+ KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
+ Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+ datasource.setType("KAFKA");
+ datasource.setName(metadata.getAppId());
+ datasource.setTopic(kafkaCfg.getTopicId());
+ datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+ Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
+ Properties prop = new Properties();
+ prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId());
+ tuple2Stream.setStreamNameSelectorProp(prop);
+ tuple2Stream.setTimestampColumn("timestamp");
+ tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
+ datasource.setCodec(tuple2Stream);
+ alertMetadataService.addDataSource(datasource);
- StreamDefinition sd = streamDesc.getSchema();
- sd.setDataSource(metadata.getAppId());
- alertMetadataService.createStream(streamDesc.getSchema());
- }
+ StreamDefinition sd = streamDesc.getSchema();
+ sd.setDataSource(metadata.getAppId());
+ alertMetadataService.createStream(streamDesc.getSchema());
}
}
}
@@ -144,6 +150,9 @@ public class ApplicationAction implements Serializable {
}
public void doStart() {
+ if (metadata.getStreams() == null) {
+ processStreams();
+ }
this.runtime.start(this.application, this.effectiveConfig);
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-server-assembly/src/main/bin/createTables.sql
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql
index 749032b..da67d3d 100644
--- a/eagle-server-assembly/src/main/bin/createTables.sql
+++ b/eagle-server-assembly/src/main/bin/createTables.sql
@@ -38,4 +38,24 @@ CREATE TABLE IF NOT EXISTS sites (
createdtime bigint(20) DEFAULT NULL,
modifiedtime bigint(20) DEFAULT NULL,
UNIQUE (siteid)
+);
+
+CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
+ site varchar(20) DEFAULT NULL,
+ filedir varchar(100) DEFAULT NULL,
+ sensitivity_type varchar(20) DEFAULT NULL,
+ primary key (site, filedir)
+);
+
+CREATE TABLE IF NOT EXISTS ip_securityzone (
+ iphost varchar(100) DEFAULT NULL,
+ security_zone varchar(100) DEFAULT NULL,
+ primary key (iphost)
+);
+
+CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity (
+ site varchar(20) DEFAULT NULL,
+ hbase_resource varchar(100) DEFAULT NULL,
+ sensitivity_type varchar(20) DEFAULT NULL,
+ primary key (site, hbase_resource)
);
\ No newline at end of file