You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/10/29 14:20:40 UTC

incubator-eagle git commit: [EAGLE-693] fix application could not detect stream change

Repository: incubator-eagle
Updated Branches:
  refs/heads/master 4250e2d32 -> 4ff963b47


[EAGLE-693] fix application could not detect stream change

Author: wujinhu <wu...@126.com>

Closes #580 from wujinhu/EAGLE-700.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/4ff963b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/4ff963b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/4ff963b4

Branch: refs/heads/master
Commit: 4ff963b47fd82cb1ca8394027518a9183f39176e
Parents: 4250e2d
Author: wujinhu <wu...@126.com>
Authored: Sat Oct 29 22:20:20 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Sat Oct 29 22:20:20 2016 +0800

----------------------------------------------------------------------
 .../eagle/app/service/ApplicationAction.java    | 81 +++++++++++---------
 .../src/main/bin/createTables.sql               | 20 +++++
 2 files changed, 65 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index b7258d3..bd0adfe 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -88,45 +88,51 @@ public class ApplicationAction implements Serializable {
     }
 
     public void doInstall() {
-        if (metadata.getDescriptor().getStreams() != null) {
-            List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
-                StreamDefinition copied = streamDefinition.copy();
-                copied.setSiteId(metadata.getSite().getSiteId());
-                copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
-                StreamSinkConfig streamSinkConfig = this.runtime.environment()
+        processStreams();
+    }
+
+    private void processStreams() {
+        if (metadata.getDescriptor().getStreams() == null) {
+            return;
+        }
+
+        List<StreamDesc> streamDescToInstall = metadata.getDescriptor().getStreams().stream().map((streamDefinition -> {
+            StreamDefinition copied = streamDefinition.copy();
+            copied.setSiteId(metadata.getSite().getSiteId());
+            copied.setStreamId(StreamIdConversions.formatSiteStreamId(metadata.getSite().getSiteId(), copied.getStreamId()));
+            StreamSinkConfig streamSinkConfig = this.runtime.environment()
                     .streamSink().getSinkConfig(StreamIdConversions.parseStreamTypeId(copied.getSiteId(), copied.getStreamId()), this.effectiveConfig);
-                StreamDesc streamDesc = new StreamDesc();
-                streamDesc.setSchema(copied);
-                streamDesc.setSink(streamSinkConfig);
-                streamDesc.setStreamId(copied.getStreamId());
-                return streamDesc;
-            })).collect(Collectors.toList());
-            metadata.setStreams(streamDescToInstall);
+            StreamDesc streamDesc = new StreamDesc();
+            streamDesc.setSchema(copied);
+            streamDesc.setSink(streamSinkConfig);
+            streamDesc.setStreamId(copied.getStreamId());
+            return streamDesc;
+        })).collect(Collectors.toList());
+        metadata.setStreams(streamDescToInstall);
 
-            // TODO: Decouple converting from StreamSink to Alert DataSource
-            // iterate each stream descriptor and create alert datasource for each
-            for (StreamDesc streamDesc : streamDescToInstall) {
-                // only take care of Kafka sink
-                if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
-                    KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
-                    Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
-                    datasource.setType("KAFKA");
-                    datasource.setName(metadata.getAppId());
-                    datasource.setTopic(kafkaCfg.getTopicId());
-                    datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
-                    Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
-                    Properties prop = new Properties();
-                    prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId());
-                    tuple2Stream.setStreamNameSelectorProp(prop);
-                    tuple2Stream.setTimestampColumn("timestamp");
-                    tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
-                    datasource.setCodec(tuple2Stream);
-                    alertMetadataService.addDataSource(datasource);
+        // TODO: Decouple converting from StreamSink to Alert DataSource
+        // iterate each stream descriptor and create alert datasource for each
+        for (StreamDesc streamDesc : streamDescToInstall) {
+            // only take care of Kafka sink
+            if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
+                KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
+                Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+                datasource.setType("KAFKA");
+                datasource.setName(metadata.getAppId());
+                datasource.setTopic(kafkaCfg.getTopicId());
+                datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+                Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
+                Properties prop = new Properties();
+                prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId());
+                tuple2Stream.setStreamNameSelectorProp(prop);
+                tuple2Stream.setTimestampColumn("timestamp");
+                tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
+                datasource.setCodec(tuple2Stream);
+                alertMetadataService.addDataSource(datasource);
 
-                    StreamDefinition sd = streamDesc.getSchema();
-                    sd.setDataSource(metadata.getAppId());
-                    alertMetadataService.createStream(streamDesc.getSchema());
-                }
+                StreamDefinition sd = streamDesc.getSchema();
+                sd.setDataSource(metadata.getAppId());
+                alertMetadataService.createStream(streamDesc.getSchema());
             }
         }
     }
@@ -144,6 +150,9 @@ public class ApplicationAction implements Serializable {
     }
 
     public void doStart() {
+        if (metadata.getStreams() == null) {
+            processStreams();
+        }
         this.runtime.start(this.application, this.effectiveConfig);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/4ff963b4/eagle-server-assembly/src/main/bin/createTables.sql
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/bin/createTables.sql b/eagle-server-assembly/src/main/bin/createTables.sql
index 749032b..da67d3d 100644
--- a/eagle-server-assembly/src/main/bin/createTables.sql
+++ b/eagle-server-assembly/src/main/bin/createTables.sql
@@ -38,4 +38,24 @@ CREATE TABLE IF NOT EXISTS sites (
   createdtime bigint(20) DEFAULT NULL,
   modifiedtime  bigint(20) DEFAULT NULL,
   UNIQUE (siteid)
+);
+
+CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
+  site varchar(20) DEFAULT NULL,
+  filedir varchar(100) DEFAULT NULL,
+  sensitivity_type varchar(20) DEFAULT NULL,
+  primary key (site, filedir)
+);
+
+CREATE TABLE IF NOT EXISTS ip_securityzone (
+  iphost varchar(100) DEFAULT NULL,
+  security_zone varchar(100) DEFAULT NULL,
+  primary key (iphost)
+);
+
+CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity (
+  site varchar(20) DEFAULT NULL,
+  hbase_resource varchar(100) DEFAULT NULL,
+  sensitivity_type varchar(20) DEFAULT NULL,
+  primary key (site, hbase_resource)
 );
\ No newline at end of file