You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ji...@apache.org on 2016/12/14 09:26:12 UTC
incubator-eagle git commit: [EAGLE-841] CorrelationSpout reads zk
connection from datasource if exists
Repository: incubator-eagle
Updated Branches:
refs/heads/master 880ba738c -> 7639ff223
[EAGLE-841] CorrelationSpout reads zk connection from datasource if exists
Author: wujinhu <wu...@126.com>
Closes #741 from wujinhu/EAGLE_841.
Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/7639ff22
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/7639ff22
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/7639ff22
Branch: refs/heads/master
Commit: 7639ff2237352884c76e862fd14826cb053bb0fc
Parents: 880ba73
Author: wujinhu <wu...@126.com>
Authored: Wed Dec 14 17:26:05 2016 +0800
Committer: wujinhu <wu...@126.com>
Committed: Wed Dec 14 17:26:05 2016 +0800
----------------------------------------------------------------------
.../eagle/alert/utils/AlertConstants.java | 3 ++
.../alert/engine/spout/CorrelationSpout.java | 38 +++++++++++---------
.../engine/topology/CorrelationSpoutTest.java | 8 +++--
.../eagle/app/service/ApplicationAction.java | 10 ++++++
.../src/main/bin/metadata-ddl.sql | 10 +++---
5 files changed, 44 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
index ee2c28c..2740836 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/utils/AlertConstants.java
@@ -28,4 +28,7 @@ public class AlertConstants {
public static final String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
public static final String COORDINATOR = "coordinator";
+
+ public static final String KAFKA_BROKER_ZK_BASE_PATH = "spout.kafkaBrokerZkBasePath";
+ public static final String KAFKA_BROKER_ZK_QUORUM = "spout.kafkaBrokerZkQuorum";
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
index 63e94ca..60a9b98 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/spout/CorrelationSpout.java
@@ -27,6 +27,7 @@ import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
import org.apache.commons.collections.CollectionUtils;
import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
import org.apache.eagle.alert.coordination.model.SpoutSpec;
@@ -235,8 +236,10 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
// build lookup table for scheme
Map<String, String> newSchemaName = new HashMap<String, String>();
+ Map<String, Map<String, String>> dataSourceProperties = new HashMap<>();
for (Kafka2TupleMetadata ds : newMeta.getKafka2TupleMetadataMap().values()) {
newSchemaName.put(ds.getTopic(), ds.getSchemeCls());
+ dataSourceProperties.put(ds.getTopic(), ds.getProperties());
}
// copy and swap
@@ -248,7 +251,8 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
LOG.warn(MessageFormat.format("try to create new topic {0}, but found in the active spout list, this may indicate some inconsistency", topic));
continue;
}
- KafkaSpoutWrapper newWrapper = createKafkaSpout(conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
+ KafkaSpoutWrapper newWrapper = createKafkaSpout(ConfigFactory.parseMap(dataSourceProperties.get(topic)).withFallback(this.config),
+ conf, context, collector, topic, newSchemaName.get(topic), newMeta, sds);
newKafkaSpoutList.put(topic, newWrapper);
}
// iterate remove topics and then close KafkaSpout
@@ -297,47 +301,47 @@ public class CorrelationSpout extends BaseRichSpout implements SpoutSpecListener
* @return
*/
@SuppressWarnings("rawtypes")
- protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
+ protected KafkaSpoutWrapper createKafkaSpout(Config configure, Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
String schemeClsName, SpoutSpec spoutSpec, Map<String, StreamDefinition> sds) throws Exception {
- String kafkaBrokerZkQuorum = config.getString("spout.kafkaBrokerZkQuorum");
+ String kafkaBrokerZkQuorum = configure.getString(AlertConstants.KAFKA_BROKER_ZK_QUORUM);
BrokerHosts hosts = null;
- if (config.hasPath("spout.kafkaBrokerZkBasePath")) {
- hosts = new ZkHosts(kafkaBrokerZkQuorum, config.getString("spout.kafkaBrokerZkBasePath"));
+ if (configure.hasPath("spout.kafkaBrokerZkBasePath")) {
+ hosts = new ZkHosts(kafkaBrokerZkQuorum, configure.getString(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH));
} else {
hosts = new ZkHosts(kafkaBrokerZkQuorum);
}
String transactionZkRoot = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_ROOT;
- if (config.hasPath("spout.stormKafkaTransactionZkPath")) {
- transactionZkRoot = config.getString("spout.stormKafkaTransactionZkPath");
+ if (configure.hasPath("spout.stormKafkaTransactionZkPath")) {
+ transactionZkRoot = configure.getString("spout.stormKafkaTransactionZkPath");
}
boolean logEventEnabled = false;
- if (config.hasPath("topology.logEventEnabled")) {
- logEventEnabled = config.getBoolean("topology.logEventEnabled");
+ if (configure.hasPath("topology.logEventEnabled")) {
+ logEventEnabled = configure.getBoolean("topology.logEventEnabled");
}
// write partition offset etc. into zkRoot+id, see PartitionManager.committedPath
String zkStateTransactionRelPath = DEFAULT_STORM_KAFKA_TRANSACTION_ZK_RELATIVE_PATH;
- if (config.hasPath("spout.stormKafkaEagleConsumer")) {
- zkStateTransactionRelPath = config.getString("spout.stormKafkaEagleConsumer");
+ if (configure.hasPath("spout.stormKafkaEagleConsumer")) {
+ zkStateTransactionRelPath = configure.getString("spout.stormKafkaEagleConsumer");
}
SpoutConfig spoutConfig = new SpoutConfig(hosts, topic, transactionZkRoot, zkStateTransactionRelPath + "/" + topic + "/" + topologyId);
// transaction zkServers
- boolean stormKafkaUseSameZkQuorumWithKafkaBroker = config.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
+ boolean stormKafkaUseSameZkQuorumWithKafkaBroker = configure.getBoolean("spout.stormKafkaUseSameZkQuorumWithKafkaBroker");
if (stormKafkaUseSameZkQuorumWithKafkaBroker) {
ZkServerPortUtils utils = new ZkServerPortUtils(kafkaBrokerZkQuorum);
spoutConfig.zkServers = utils.getZkHosts();
spoutConfig.zkPort = utils.getZkPort();
} else {
- ZkServerPortUtils utils = new ZkServerPortUtils(config.getString("spout.stormKafkaTransactionZkQuorum"));
+ ZkServerPortUtils utils = new ZkServerPortUtils(configure.getString("spout.stormKafkaTransactionZkQuorum"));
spoutConfig.zkServers = utils.getZkHosts();
spoutConfig.zkPort = utils.getZkPort();
}
// transaction update interval
- spoutConfig.stateUpdateIntervalMs = config.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? config.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
+ spoutConfig.stateUpdateIntervalMs = configure.hasPath("spout.stormKafkaStateUpdateIntervalMs") ? configure.getInt("spout.stormKafkaStateUpdateIntervalMs") : 2000;
// Kafka fetch size
- spoutConfig.fetchSizeBytes = config.hasPath("spout.stormKafkaFetchSizeBytes") ? config.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
+ spoutConfig.fetchSizeBytes = configure.hasPath("spout.stormKafkaFetchSizeBytes") ? configure.getInt("spout.stormKafkaFetchSizeBytes") : 1048586;
// "startOffsetTime" is for test usage, prod should not use this
- if (config.hasPath("spout.stormKafkaStartOffsetTime")) {
- spoutConfig.startOffsetTime = config.getInt("spout.stormKafkaStartOffsetTime");
+ if (configure.hasPath("spout.stormKafkaStartOffsetTime")) {
+ spoutConfig.startOffsetTime = configure.getInt("spout.stormKafkaStartOffsetTime");
}
spoutConfig.scheme = createMultiScheme(conf, topic, schemeClsName);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
index 9deb4b2..5a86cd2 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -56,7 +56,7 @@ public class CorrelationSpoutTest {
AtomicBoolean validated = new AtomicBoolean(false);
CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
@Override
- protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context,
+ protected KafkaSpoutWrapper createKafkaSpout(Config config, Map conf, TopologyContext context,
SpoutOutputCollector collector, String topic, String schemeClsName, SpoutSpec streamMetadatas, Map<String, StreamDefinition> sds)
throws Exception {
validated.set(true);
@@ -94,7 +94,8 @@ public class CorrelationSpoutTest {
final AtomicBoolean verified = new AtomicBoolean(false);
CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
@Override
- protected KafkaSpoutWrapper createKafkaSpout(Map conf,
+ protected KafkaSpoutWrapper createKafkaSpout(Config config,
+ Map conf,
TopologyContext context,
SpoutOutputCollector collector,
String topic,
@@ -147,7 +148,8 @@ public class CorrelationSpoutTest {
final AtomicBoolean verified = new AtomicBoolean(false);
CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1) {
@Override
- protected KafkaSpoutWrapper createKafkaSpout(Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
+ protected KafkaSpoutWrapper createKafkaSpout(Config config,
+ Map conf, TopologyContext context, SpoutOutputCollector collector, final String topic,
String schemeClsName, SpoutSpec streamMetadatas,
Map<String, StreamDefinition> sds) {
return new KafkaSpoutWrapper(null, null);
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
index 8c7c8d6..a502f81 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/service/ApplicationAction.java
@@ -31,6 +31,7 @@ import org.apache.eagle.app.Application;
import org.apache.eagle.app.environment.ExecutionRuntime;
import org.apache.eagle.app.environment.ExecutionRuntimeManager;
import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
+import org.apache.eagle.app.messaging.KafkaStreamSourceConfig;
import org.apache.eagle.metadata.model.ApplicationEntity;
import org.apache.eagle.metadata.model.StreamSourceConfig;
import org.apache.eagle.metadata.utils.StreamIdConversions;
@@ -144,6 +145,15 @@ public class ApplicationAction implements Serializable {
datasource.setName(metadata.getAppId());
datasource.setTopic(kafkaCfg.getTopicId());
datasource.setSchemeCls(JsonScheme.class.getCanonicalName());
+ datasource.setProperties(new HashMap<>());
+
+ KafkaStreamSourceConfig streamSourceConfig = (KafkaStreamSourceConfig) streamDesc.getSourceConfig();
+ if (streamSourceConfig != null) {
+ Map<String, String> properties = datasource.getProperties();
+ properties.put(AlertConstants.KAFKA_BROKER_ZK_BASE_PATH, streamSourceConfig.getBrokerZkPath());
+ properties.put(AlertConstants.KAFKA_BROKER_ZK_QUORUM, streamSourceConfig.getBrokerZkQuorum());
+ }
+
Tuple2StreamMetadata tuple2Stream = new Tuple2StreamMetadata();
Properties prop = new Properties();
prop.put(JsonStringStreamNameSelector.USER_PROVIDED_STREAM_NAME_PROPERTY, streamDesc.getStreamId());
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/7639ff22/eagle-server-assembly/src/main/bin/metadata-ddl.sql
----------------------------------------------------------------------
diff --git a/eagle-server-assembly/src/main/bin/metadata-ddl.sql b/eagle-server-assembly/src/main/bin/metadata-ddl.sql
index f80a7ea..4bed927 100644
--- a/eagle-server-assembly/src/main/bin/metadata-ddl.sql
+++ b/eagle-server-assembly/src/main/bin/metadata-ddl.sql
@@ -46,21 +46,21 @@ CREATE TABLE IF NOT EXISTS sites (
-- eagle security module metadata
CREATE TABLE IF NOT EXISTS hdfs_sensitivity_entity (
- site varchar(20) DEFAULT NULL,
- filedir varchar(100) DEFAULT NULL,
+ site varchar(20) NOT NULL,
+ filedir varchar(100) NOT NULL,
sensitivity_type varchar(20) DEFAULT NULL,
primary key (site, filedir)
);
CREATE TABLE IF NOT EXISTS ip_securityzone (
- iphost varchar(100) DEFAULT NULL,
+ iphost varchar(100) NOT NULL,
security_zone varchar(100) DEFAULT NULL,
primary key (iphost)
);
CREATE TABLE IF NOT EXISTS hbase_sensitivity_entity (
- site varchar(20) DEFAULT NULL,
- hbase_resource varchar(100) DEFAULT NULL,
+ site varchar(20) NOT NULL,
+ hbase_resource varchar(100) NOT NULL,
sensitivity_type varchar(20) DEFAULT NULL,
primary key (site, hbase_resource)
);