You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/25 21:09:37 UTC
incubator-eagle git commit: EAGLE-493 Create alert metadata based on
application stream sink configuration
Repository: incubator-eagle
Updated Branches:
refs/heads/develop 7f3726716 -> c5d05abd1
EAGLE-493 Create alert metadata based on application stream sink configuration
Author: @yonzhang <yo...@apache.org>
Closes: #389
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/c5d05abd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/c5d05abd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/c5d05abd
Branch: refs/heads/develop
Commit: c5d05abd1d5f71d78dbe9a577a3975dc5b964d51
Parents: 7f37267
Author: yonzhang <yo...@gmail.com>
Authored: Thu Aug 25 14:13:45 2016 -0700
Committer: yonzhang <yo...@gmail.com>
Committed: Thu Aug 25 14:13:45 2016 -0700
----------------------------------------------------------------------
.../src/main/resources/application.conf | 2 +-
eagle-core/eagle-app/eagle-app-base/pom.xml | 5 ++
.../eagle/app/service/ApplicationContext.java | 49 ++++++++++++++++++--
.../impl/ApplicationManagementServiceImpl.java | 14 ++++--
.../apache/eagle/app/sink/KafkaStreamSink.java | 2 +-
...ecurity.auditlog.HdfsAuditLogAppProvider.xml | 6 +--
6 files changed, 63 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index 60595b1..7030e45 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -48,7 +48,7 @@
"metadataService": {
"context" : "/rest",
"host" : "localhost",
- "port" : 8080
+ "port" : 9090
},
"coordinatorService": {
"host": "localhost",
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/pom.xml b/eagle-core/eagle-app/eagle-app-base/pom.xml
index d89b78c..12b056c 100644
--- a/eagle-core/eagle-app/eagle-app-base/pom.xml
+++ b/eagle-core/eagle-app/eagle-app-base/pom.xml
@@ -78,6 +78,11 @@
</exclusions>
</dependency>
<dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>alert-engine</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>${jersey.version}</version>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
index 91d33ca..52eb628 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationContext.java
@@ -19,18 +19,23 @@ package org.apache.eagle.app.service;
import com.google.common.base.Preconditions;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.scheme.JsonScheme;
+import org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector;
+import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.app.Application;
import org.apache.eagle.app.ApplicationLifecycle;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeManager;
+import org.apache.eagle.app.sink.KafkaStreamSinkConfig;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.StreamDesc;
import org.apache.eagle.metadata.model.StreamSinkConfig;
import java.io.Serializable;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.stream.Collectors;
/**
@@ -47,12 +52,13 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
private final Application application;
private final ExecutionRuntime runtime;
private final ApplicationEntity metadata;
+ private final IMetadataDao alertMetadataService;
/**
* @param metadata ApplicationEntity
* @param application Application
*/
- public ApplicationContext(Application application, ApplicationEntity metadata, Config envConfig){
+ public ApplicationContext(Application application, ApplicationEntity metadata, Config envConfig, IMetadataDao alertMetadataService){
Preconditions.checkNotNull(application,"Application is null");
Preconditions.checkNotNull(metadata,"ApplicationEntity is null");
this.application = application;
@@ -69,6 +75,7 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
executionConfig.put("appId", metadata.getAppId());
executionConfig.put("jarPath", metadata.getJarPath());
this.config = ConfigFactory.parseMap(executionConfig).withFallback(envConfig);
+ this.alertMetadataService = alertMetadataService;
}
@Override
@@ -83,12 +90,44 @@ public class ApplicationContext implements Serializable, ApplicationLifecycle {
return streamDesc;
})).collect(Collectors.toList());
metadata.setStreams(streamDescCollection);
+
+ // iterate each stream descriptor and create alert datasource for each
+ for(StreamDesc streamDesc : streamDescCollection) {
+ // only take care of Kafka sink
+ if (streamDesc.getSink() instanceof KafkaStreamSinkConfig) {
+ KafkaStreamSinkConfig kafkaCfg = (KafkaStreamSinkConfig) streamDesc.getSink();
+ Kafka2TupleMetadata datasource = new Kafka2TupleMetadata();
+ datasource.setType("KAFKA");
+ datasource.setName(metadata.getAppId());
+ datasource.setTopic(kafkaCfg.getTopicId());
+ datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+ Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
+ Set<String> activeStreamNames = new HashSet<>();
+ activeStreamNames.add(streamDesc.getSchema().getStreamId());
+ tuple2Stream.setActiveStreamNames(activeStreamNames);
+ tuple2Stream.setTimestampColumn("timestamp");
+ tuple2Stream.setStreamNameSelectorCls(JsonStringStreamNameSelector.class.getCanonicalName());
+ datasource.setCodec(tuple2Stream);
+ alertMetadataService.addDataSource(datasource);
+
+ StreamDefinition sd = streamDesc.getSchema();
+ sd.setDataSource(metadata.getAppId());
+ alertMetadataService.createStream(streamDesc.getSchema());
+ }
+ }
}
}
@Override
public void onUninstall() {
- //
+ // we should remove alert data source and stream definition while we do uninstall
+ if(metadata.getStreams() == null)
+ return;
+ // iterate each stream descriptor and create alert datasource for each
+ for(StreamDesc streamDesc : metadata.getStreams()) {
+ alertMetadataService.removeDataSource(metadata.getAppId());
+ alertMetadataService.removeStream(streamDesc.getStreamId());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
index 314b0fb..c355a10 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/impl/ApplicationManagementServiceImpl.java
@@ -21,6 +21,7 @@ import com.google.inject.Inject;
import com.google.inject.Singleton;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.metadata.IMetadataDao;
import org.apache.eagle.app.service.ApplicationContext;
import org.apache.eagle.app.service.ApplicationOperations;
import org.apache.eagle.app.service.ApplicationManagementService;
@@ -45,6 +46,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
private final SiteEntityService siteEntityService;
private final ApplicationProviderService applicationProviderService;
private final ApplicationEntityService applicationEntityService;
+ private final IMetadataDao alertMetadataService;
private final Config config;
private final static Logger LOGGER = LoggerFactory.getLogger(ApplicationManagementServiceImpl.class);
@@ -53,11 +55,13 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
Config config,
SiteEntityService siteEntityService,
ApplicationProviderService applicationProviderService,
- ApplicationEntityService applicationEntityService){
+ ApplicationEntityService applicationEntityService,
+ IMetadataDao alertMetadataService){
this.config = config;
this.siteEntityService = siteEntityService;
this.applicationProviderService = applicationProviderService;
this.applicationEntityService = applicationEntityService;
+ this.alertMetadataService = alertMetadataService;
}
@Override
@@ -97,7 +101,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
applicationEntity.setConfiguration(appConfig);
ApplicationContext applicationContext = new ApplicationContext(
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
- applicationEntity,config);
+ applicationEntity,config, alertMetadataService);
applicationContext.onInstall();
return applicationEntityService.create(applicationEntity);
}
@@ -107,7 +111,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
- applicationEntity,config);
+ applicationEntity,config, alertMetadataService);
// TODO: Check status, skip stop if already STOPPED
try {
applicationContext.onStop();
@@ -123,7 +127,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
- applicationEntity,config);
+ applicationEntity,config, alertMetadataService);
applicationContext.onStart();
return applicationEntity;
}
@@ -133,7 +137,7 @@ public class ApplicationManagementServiceImpl implements ApplicationManagementSe
ApplicationEntity applicationEntity = applicationEntityService.getByUUIDOrAppId(operation.getUuid(),operation.getAppId());
ApplicationContext applicationContext = new ApplicationContext(
applicationProviderService.getApplicationProviderByType(applicationEntity.getDescriptor().getType()).getApplication(),
- applicationEntity,config);
+ applicationEntity,config, alertMetadataService);
applicationContext.onStop();
return applicationEntity;
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
index 5c33c94..27848d9 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/sink/KafkaStreamSink.java
@@ -57,7 +57,7 @@ public class KafkaStreamSink extends StormStreamSink<KafkaStreamSinkConfig> {
protected void execute(Object key, Map event,BasicOutputCollector collector) {
try {
String output = new ObjectMapper().writeValueAsString(event);
- producer.send(new KeyedMessage(this.topicId, event.get("user"), output));
+ producer.send(new KeyedMessage(this.topicId, key, output));
}catch(Exception ex){
LOG.error(ex.getMessage(), ex);
collector.reportError(ex);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/c5d05abd/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
index 2419747..2a8ff0f 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -117,7 +117,7 @@
<property>
<name>dataSinkConfig.topic</name>
<displayName>dataSinkConfig.topic</displayName>
- <value>hdfs_audit_log_parsed</value>
+ <value>hdfs_audit_log_enriched</value>
<description>topic for kafka data sink</description>
</property>
<property>
@@ -149,8 +149,8 @@
</configuration>
<streams>
<stream>
- <streamId>hdfs_audit_log_stream</streamId>
- <description>Hdfs Audit Log Stream</description>
+ <streamId>hdfs_audit_log_enriched_stream</streamId>
+ <description>Hdfs Audit Log Enriched Stream</description>
<validate>true</validate>
<timeseries>true</timeseries>
<columns>