You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/12 03:40:29 UTC
[2/3] incubator-eagle git commit: hdfs, hbase,
mapr app conversion Author: Yong Zhang
Close: #334
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
new file mode 100644
index 0000000..ccbce98
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/AbstractHdfsAuditLogApplication.java
@@ -0,0 +1,117 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.generated.StormTopology;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IRichSpout;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.commons.lang3.time.DateUtils;
+import org.apache.eagle.app.StormApplication;
+import org.apache.eagle.app.environment.impl.StormEnvironment;
+import org.apache.eagle.app.sink.StormStreamSink;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.impl.storm.partition.CustomPartitionGrouping;
+import org.apache.eagle.partition.DataDistributionDao;
+import org.apache.eagle.partition.PartitionAlgorithm;
+import org.apache.eagle.partition.PartitionStrategy;
+import org.apache.eagle.partition.PartitionStrategyImpl;
+import org.apache.eagle.security.partition.DataDistributionDaoImpl;
+import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
+import org.apache.eagle.security.topo.NewKafkaSourcedSpoutProvider;
+import storm.kafka.StringScheme;
+
+/**
+ * Since 8/10/16.
+ */
+public abstract class AbstractHdfsAuditLogApplication extends StormApplication {
+ public final static String SPOUT_TASK_NUM = "topology.numOfSpoutTasks";
+ public final static String PARSER_TASK_NUM = "topology.numOfParserTasks";
+ public final static String SENSITIVITY_JOIN_TASK_NUM = "topology.numOfSensitivityJoinTasks";
+ public final static String IPZONE_JOIN_TASK_NUM = "topology.numOfIPZoneJoinTasks";
+ public final static String SINK_TASK_NUM = "topology.numOfSinkTasks";
+
+ @Override
+ public StormTopology execute(Config config, StormEnvironment environment) {
+ TopologyBuilder builder = new TopologyBuilder();
+ NewKafkaSourcedSpoutProvider provider = new NewKafkaSourcedSpoutProvider();
+ IRichSpout spout = provider.getSpout(config);
+
+ int numOfSpoutTasks = config.getInt(SPOUT_TASK_NUM);
+ int numOfParserTasks = config.getInt(PARSER_TASK_NUM);
+ int numOfSensitivityJoinTasks = config.getInt(SENSITIVITY_JOIN_TASK_NUM);
+ int numOfIPZoneJoinTasks = config.getInt(IPZONE_JOIN_TASK_NUM);
+ int numOfSinkTasks = config.getInt(SINK_TASK_NUM);
+
+ builder.setSpout("ingest", spout, numOfSpoutTasks);
+
+
+ HdfsAuditLogParserBolt parserBolt = new HdfsAuditLogParserBolt();
+ BoltDeclarer boltDeclarer = builder.setBolt("parserBolt", parserBolt, numOfParserTasks);
+
+ Boolean useDefaultPartition = !config.hasPath("eagleProps.useDefaultPartition") || config.getBoolean("eagleProps.useDefaultPartition");
+ if(useDefaultPartition){
+ boltDeclarer.fieldsGrouping("ingest", new Fields(StringScheme.STRING_SCHEME_KEY));
+ }else{
+ boltDeclarer.customGrouping("ingest", new CustomPartitionGrouping(createStrategy(config)));
+ }
+
+ FileSensitivityDataJoinBolt sensitivityDataJoinBolt = new FileSensitivityDataJoinBolt(config);
+ BoltDeclarer sensitivityDataJoinBoltDeclarer = builder.setBolt("sensitivityJoin", sensitivityDataJoinBolt, numOfSensitivityJoinTasks);
+ sensitivityDataJoinBoltDeclarer.fieldsGrouping("parserBolt", new Fields("f1"));
+
+ IPZoneDataJoinBolt ipZoneDataJoinBolt = new IPZoneDataJoinBolt(config);
+ BoltDeclarer ipZoneDataJoinBoltDeclarer = builder.setBolt("ipZoneJoin", ipZoneDataJoinBolt, numOfIPZoneJoinTasks);
+ ipZoneDataJoinBoltDeclarer.fieldsGrouping("sensitivityJoin", new Fields("user"));
+
+ StormStreamSink sinkBolt = environment.getStreamSink("hdfs_audit_log_stream",config);
+ BoltDeclarer kafkaBoltDeclarer = builder.setBolt("kafkaSink", sinkBolt, numOfSinkTasks);
+ kafkaBoltDeclarer.fieldsGrouping("ipZoneJoin", new Fields("user"));
+ return builder.createTopology();
+
+
+ }
+
+ public abstract BaseRichBolt getParserBolt();
+ public abstract String getSinkStreamName();
+
+ public static PartitionStrategy createStrategy(Config config) {
+ // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
+ String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+ String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+ String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+ String topic = config.getString("dataSourceConfig.topic");
+ DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
+ PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
+ String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
+ Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
+ String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
+ Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
+ PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
+ return strategy;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
new file mode 100644
index 0000000..6cbbde6
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinBolt.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
+import org.apache.eagle.security.auditlog.util.SimplifyPath;
+import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class FileSensitivityDataJoinBolt extends BaseRichBolt {
+ private static Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinBolt.class);
+ private Config config;
+ private OutputCollector collector;
+
+ public FileSensitivityDataJoinBolt(Config config){
+ this.config = config;
+ }
+
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ // start hdfs sensitivity data polling
+ try{
+ ExternalDataJoiner joiner = new ExternalDataJoiner(
+ FileSensitivityPollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex());
+ joiner.start();
+ }catch(Exception ex){
+ LOG.error("Fail bringing up quartz scheduler", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(0);
+ Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
+ Map<String, FileSensitivityAPIEntity> map =
+ (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance().
+ getJobResult(FileSensitivityPollingJob.class);
+ FileSensitivityAPIEntity e = null;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Receive map: " + map + "event: " + event);
+ }
+
+ String src = (String) event.get("src");
+ if (map != null && src != null) {
+ String simplifiedPath = new SimplifyPath().build(src);
+ for (String fileDir : map.keySet()) {
+ Pattern pattern = Pattern.compile(simplifiedPath, Pattern.CASE_INSENSITIVE);
+ Matcher matcher = pattern.matcher(fileDir);
+ boolean isMatched = matcher.matches();
+ if (isMatched) {
+ e = map.get(fileDir);
+ break;
+ }
+ }
+ }
+ event.put("sensitivityType", e == null ? "NA" : e.getSensitivityType());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("After file sensitivity lookup: " + event);
+ }
+ // LOG.info(">>>> After file sensitivity lookup: " + event);
+ collector.emit(Arrays.asList(event.get("user"), event));
+ }catch(Exception ex){
+ LOG.error("error joining data, ignore it", ex);
+ }finally {
+ collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("user", "message"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
deleted file mode 100644
index 33d29d0..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/FileSensitivityDataJoinExecutor.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
-import org.apache.eagle.security.auditlog.util.SimplifyPath;
-import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-public class FileSensitivityDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
- private static final Logger LOG = LoggerFactory.getLogger(FileSensitivityDataJoinExecutor.class);
- private Config config;
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- // start IPZone data polling
- try{
- ExternalDataJoiner joiner = new ExternalDataJoiner(FileSensitivityPollingJob.class, config, "1");
- joiner.start();
- }catch(Exception ex){
- LOG.error("Fail bring up quartz scheduler", ex);
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
- Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1);
- Map<String, Object> event = new TreeMap<String, Object>(toBeCopied);
- Map<String, FileSensitivityAPIEntity> map = (Map<String, FileSensitivityAPIEntity>) ExternalDataCache.getInstance().getJobResult(FileSensitivityPollingJob.class);
- FileSensitivityAPIEntity e = null;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Receive map: " + map + "event: " + event);
- }
-
- String src = (String)event.get("src");
- if(map != null && src != null) {
- String simplifiedPath = new SimplifyPath().build(src);
- for (String fileDir : map.keySet()) {
- Pattern pattern = Pattern.compile(simplifiedPath,Pattern.CASE_INSENSITIVE);
- Matcher matcher = pattern.matcher(fileDir);
- boolean isMatched = matcher.matches();
- if (isMatched) {
- e = map.get(fileDir);
- break;
- }
- }
- }
- event.put("sensitivityType", e == null ? "NA" : e.getSensitivityType());
- if(LOG.isDebugEnabled()) {
- LOG.debug("After file sensitivity lookup: " + event);
- }
- // LOG.info(">>>> After file sensitivity lookup: " + event);
- outputCollector.collect(new Tuple2(event.get("user"), event));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
new file mode 100644
index 0000000..fcf9d4f
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HDFSAuditLogAppProvider.java
@@ -0,0 +1,34 @@
+/*
+ *
+ * *
+ * * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * * contributor license agreements. See the NOTICE file distributed with
+ * * * this work for additional information regarding copyright ownership.
+ * * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * * (the "License"); you may not use this file except in compliance with
+ * * * the License. You may obtain a copy of the License at
+ * * * <p/>
+ * * * http://www.apache.org/licenses/LICENSE-2.0
+ * * * <p/>
+ * * * Unless required by applicable law or agreed to in writing, software
+ * * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * * See the License for the specific language governing permissions and
+ * * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import org.apache.eagle.app.spi.AbstractApplicationProvider;
+
+/**
+ * Since 8/11/16.
+ */
+public class HdfsAuditLogAppProvider extends AbstractApplicationProvider<HdfsAuditLogApplication> {
+ @Override
+ public HdfsAuditLogApplication getApplication() {
+ return new HdfsAuditLogApplication();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
new file mode 100644
index 0000000..791572b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogApplication.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * *
+ * * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * * contributor license agreements. See the NOTICE file distributed with
+ * * * this work for additional information regarding copyright ownership.
+ * * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * * (the "License"); you may not use this file except in compliance with
+ * * * the License. You may obtain a copy of the License at
+ * * * <p/>
+ * * * http://www.apache.org/licenses/LICENSE-2.0
+ * * * <p/>
+ * * * Unless required by applicable law or agreed to in writing, software
+ * * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * * See the License for the specific language governing permissions and
+ * * * limitations under the License.
+ * *
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.topology.base.BaseRichBolt;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 8/11/16.
+ */
+public class HdfsAuditLogApplication extends AbstractHdfsAuditLogApplication {
+ @Override
+ public BaseRichBolt getParserBolt() {
+ return new HdfsAuditLogParserBolt();
+ }
+
+ @Override
+ public String getSinkStreamName() {
+ return "hdfs_audit_log_stream";
+ }
+
+ public static void main(String[] args){
+ Config config = ConfigFactory.load();
+ HdfsAuditLogApplication app = new HdfsAuditLogApplication();
+ app.run(config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
deleted file mode 100644
index 08ab993..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogKafkaDeserializer.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.TreeMap;
-
-import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.dataproc.impl.storm.kafka.SpoutKafkaMessageDeserializer;
-import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
-import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
-
-public class HdfsAuditLogKafkaDeserializer implements SpoutKafkaMessageDeserializer{
- private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogKafkaDeserializer.class);
- private Properties props;
-
- public HdfsAuditLogKafkaDeserializer(Properties props){
- this.props = props;
- }
-
- /**
- * the steps for deserializing message from kafka
- * 1. convert byte array to string
- * 2. parse string to eagle entity
- */
- @Override
- public Object deserialize(byte[] arg0) {
- String logLine = new String(arg0);
-
- HDFSAuditLogParser parser = new HDFSAuditLogParser();
- HDFSAuditLogObject entity = null;
- try{
- entity = parser.parse(logLine);
- }catch(Exception ex){
- LOG.error("Failing parse audit log message", ex);
- }
- if(entity == null){
- LOG.warn("Event ignored as it can't be correctly parsed, the log is ", logLine);
- return null;
- }
- Map<String, Object> map = new TreeMap<String, Object>();
- map.put("src", entity.src);
- map.put("dst", entity.dst);
- map.put("host", entity.host);
- map.put("timestamp", entity.timestamp);
- map.put("allowed", entity.allowed);
- map.put("user", entity.user);
- map.put("cmd", entity.cmd);
-
- return map;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
deleted file mode 100644
index a7f207e..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogMonitoringTopology.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.security.auditlog;
-
-
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.stream.application.TopologyExecutable;
-
-public class HdfsAuditLogMonitoringTopology implements TopologyExecutable {
- @Override
- public void submit(String topology, Config config) {
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm(config);
- KafkaSourcedSpoutProvider provider = HdfsAuditLogProcessorMain.createProvider(env.getConfig());
- Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
- if (balancePartition) {
- HdfsAuditLogProcessorMain.execWithBalancedPartition(env, provider);
- } else {
- HdfsAuditLogProcessorMain.execWithDefaultPartition(env, provider);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
new file mode 100644
index 0000000..5ea5950
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogParserBolt.java
@@ -0,0 +1,77 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * * <p/>
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * * <p/>
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Since 8/10/16.
+ */
+public class HdfsAuditLogParserBolt extends BaseRichBolt {
+ private static Logger LOG = LoggerFactory.getLogger(HdfsAuditLogParserBolt.class);
+ private OutputCollector collector;
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ String logLine = input.getString(0);
+
+ HDFSAuditLogParser parser = new HDFSAuditLogParser();
+ HDFSAuditLogObject entity = null;
+ try{
+ entity = parser.parse(logLine);
+ Map<String, Object> map = new TreeMap<String, Object>();
+ map.put("src", entity.src);
+ map.put("dst", entity.dst);
+ map.put("host", entity.host);
+ map.put("timestamp", entity.timestamp);
+ map.put("allowed", entity.allowed);
+ map.put("user", entity.user);
+ map.put("cmd", entity.cmd);
+ collector.emit(Arrays.asList(map));
+ }catch(Exception ex){
+ LOG.error("Failing parse audit log message", ex);
+ }finally {
+ collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("f1"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
deleted file mode 100644
index 60b0e36..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/HdfsAuditLogProcessorMain.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-package org.apache.eagle.security.auditlog;
-
-import backtype.storm.spout.SchemeAsMultiScheme;
-import com.typesafe.config.Config;
-import org.apache.commons.lang3.time.DateUtils;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutProvider;
-import org.apache.eagle.dataproc.impl.storm.kafka.KafkaSourcedSpoutScheme;
-import org.apache.eagle.datastream.ExecutionEnvironments;
-import org.apache.eagle.datastream.core.StreamProducer;
-import org.apache.eagle.datastream.storm.StormExecutionEnvironment;
-import org.apache.eagle.partition.DataDistributionDao;
-import org.apache.eagle.partition.PartitionAlgorithm;
-import org.apache.eagle.partition.PartitionStrategy;
-import org.apache.eagle.partition.PartitionStrategyImpl;
-import org.apache.eagle.security.partition.DataDistributionDaoImpl;
-import org.apache.eagle.security.partition.GreedyPartitionAlgorithm;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
-public class HdfsAuditLogProcessorMain {
- public static PartitionStrategy createStrategy(Config config) {
- // TODO: Refactor configuration structure to avoid repeated config processing configure ~ hao
- String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
- String username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
- String password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
- String topic = config.getString("dataSourceConfig.topic");
- DataDistributionDao dao = new DataDistributionDaoImpl(host, port, username, password, topic);
- PartitionAlgorithm algorithm = new GreedyPartitionAlgorithm();
- String key1 = EagleConfigConstants.EAGLE_PROPS + ".partitionRefreshIntervalInMin";
- Integer partitionRefreshIntervalInMin = config.hasPath(key1) ? config.getInt(key1) : 60;
- String key2 = EagleConfigConstants.EAGLE_PROPS + ".kafkaStatisticRangeInMin";
- Integer kafkaStatisticRangeInMin = config.hasPath(key2) ? config.getInt(key2) : 60;
- PartitionStrategy strategy = new PartitionStrategyImpl(dao, algorithm, partitionRefreshIntervalInMin * DateUtils.MILLIS_PER_MINUTE, kafkaStatisticRangeInMin * DateUtils.MILLIS_PER_MINUTE);
- return strategy;
- }
-
- public static KafkaSourcedSpoutProvider createProvider(Config config) {
- String deserClsName = config.getString("dataSourceConfig.deserializerClass");
- final KafkaSourcedSpoutScheme scheme = new KafkaSourcedSpoutScheme(deserClsName, config) {
- @Override
- public List<Object> deserialize(byte[] ser) {
- Object tmp = deserializer.deserialize(ser);
- Map<String, Object> map = (Map<String, Object>)tmp;
- if(tmp == null) return null;
- return Arrays.asList(map.get("user"), tmp);
- }
- };
-
- KafkaSourcedSpoutProvider provider = new KafkaSourcedSpoutProvider() {
- @Override
- public SchemeAsMultiScheme getStreamScheme(String deserClsName, Config context) {
- return new SchemeAsMultiScheme(scheme);
- }
- };
- return provider;
- }
-
- @SuppressWarnings("unchecked")
- public static void execWithDefaultPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
- StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(Arrays.asList(0));
- //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
- //source.streamUnion(reassembler)
- source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
- .flatMap(new IPZoneDataJoinExecutor())
- .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
- env.execute();
- }
-
- @SuppressWarnings("unchecked")
- public static void execWithBalancedPartition(StormExecutionEnvironment env, KafkaSourcedSpoutProvider provider) {
- PartitionStrategy strategy = createStrategy(env.getConfig());
- StreamProducer source = env.fromSpout(provider).withOutputFields(2).nameAs("kafkaMsgConsumer").groupBy(strategy);
- //StreamProducer reassembler = source.flatMap(new HdfsUserCommandReassembler()).groupBy(Arrays.asList(0));
- //source.streamUnion(reassembler)
- source.flatMap(new FileSensitivityDataJoinExecutor()).groupBy(Arrays.asList(0))
- .flatMap(new IPZoneDataJoinExecutor())
- .alertWithConsumer("hdfsAuditLogEventStream", "hdfsAuditLogAlertExecutor");
- env.execute();
- }
-
- public static void main(String[] args) throws Exception{
- StormExecutionEnvironment env = ExecutionEnvironments.getStorm(args);
- Config config = env.getConfig();
- KafkaSourcedSpoutProvider provider = createProvider(env.getConfig());
- Boolean balancePartition = config.hasPath("eagleProps.balancePartitionEnabled") && config.getBoolean("eagleProps.balancePartitionEnabled");
- if (balancePartition) {
- execWithBalancedPartition(env, provider);
- } else {
- execWithDefaultPartition(env, provider);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
new file mode 100644
index 0000000..d02f959
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinBolt.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.security.auditlog;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.security.auditlog.timer.FileSensitivityPollingJob;
+import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
+import org.apache.eagle.security.entity.IPZoneEntity;
+import org.apache.eagle.security.util.ExternalDataCache;
+import org.apache.eagle.security.util.ExternalDataJoiner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Tuple2;
+
+import java.util.Arrays;
+import java.util.Map;
+import java.util.TreeMap;
+
+public class IPZoneDataJoinBolt extends BaseRichBolt {
+ private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinBolt.class);
+ private Config config;
+ private OutputCollector collector;
+
+ public IPZoneDataJoinBolt(Config config){
+ this.config = config;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ this.collector = collector;
+ // start ipzone data polling
+ try{
+ ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, context.getThisComponentId() + "." + context.getThisTaskIndex());
+ joiner.start();
+ }catch(Exception ex){
+ LOG.error("Fail bring up quartz scheduler", ex);
+ throw new IllegalStateException(ex);
+ }
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ try {
+ Map<String, Object> toBeCopied = (Map<String, Object>) input.getValue(1);
+ Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy
+ Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class);
+ IPZoneEntity e = null;
+ if (map != null) {
+ e = map.get(event.get("host"));
+ }
+ event.put("securityZone", e == null ? "NA" : e.getSecurityZone());
+ if (LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event);
+ collector.emit(Arrays.asList(event.get("user"), event));
+ }catch(Exception ex){
+ LOG.error("error joining data, ignore it", ex);
+ }finally {
+ collector.ack(input);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("user", "message"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
deleted file mode 100644
index d633dcd..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/IPZoneDataJoinExecutor.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.security.auditlog;
-
-import com.typesafe.config.Config;
-import org.apache.eagle.datastream.Collector;
-import org.apache.eagle.datastream.JavaStormStreamExecutor2;
-import org.apache.eagle.security.auditlog.timer.IPZonePollingJob;
-import org.apache.eagle.security.entity.IPZoneEntity;
-import org.apache.eagle.security.util.ExternalDataCache;
-import org.apache.eagle.security.util.ExternalDataJoiner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import scala.Tuple2;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-public class IPZoneDataJoinExecutor extends JavaStormStreamExecutor2<String, Map> {
- private static final Logger LOG = LoggerFactory.getLogger(IPZoneDataJoinExecutor.class);
- private Config config;
-
- @Override
- public void prepareConfig(Config config) {
- this.config = config;
- }
-
- @Override
- public void init() {
- // start IPZone data polling
- try{
- ExternalDataJoiner joiner = new ExternalDataJoiner(IPZonePollingJob.class, config, "1");
- joiner.start();
- }catch(Exception ex){
- LOG.error("Fail bring up quartz scheduler", ex);
- throw new IllegalStateException(ex);
- }
- }
-
- @Override
- public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, Map>> outputCollector){
- Map<String, Object> toBeCopied = (Map<String, Object>)input.get(1);
- Map<String, Object> event = new TreeMap<String, Object>(toBeCopied); // shallow copy
- Map<String, IPZoneEntity> map = (Map<String, IPZoneEntity>) ExternalDataCache.getInstance().getJobResult(IPZonePollingJob.class);
- IPZoneEntity e = null;
- if(map != null){
- e = map.get(event.get("host"));
- }
- event.put("securityZone", e == null ? "NA" : e.getSecurityZone());
- if(LOG.isDebugEnabled()) LOG.debug("After IP zone lookup: " + event);
- outputCollector.collect(new Tuple2(event.get("user"), event));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
index a4fed79..375edc7 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/FileSensitivityPollingJob.java
@@ -16,10 +16,14 @@
*/
package org.apache.eagle.security.auditlog.timer;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.HdfsSensitivityEntity;
+import org.apache.eagle.security.service.IMetadataServiceClient;
+import org.apache.eagle.security.service.MetadataServiceClientImpl;
import org.apache.eagle.security.util.ExternalDataCache;
import org.apache.eagle.security.entity.FileSensitivityAPIEntity;
import org.quartz.Job;
@@ -29,9 +33,6 @@ import org.quartz.JobExecutionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.service.client.IEagleServiceClient;
-import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import com.google.common.base.Function;
import com.google.common.collect.Maps;
@@ -43,15 +44,15 @@ public class FileSensitivityPollingJob implements Job{
throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
try{
- List<FileSensitivityAPIEntity> ipZones = load(jobDataMap);
- if(ipZones == null){
+ Collection<HdfsSensitivityEntity> sensitivityAPIEntities = load(jobDataMap);
+ if(sensitivityAPIEntities == null){
LOG.warn("File sensitivity information is empty");
return;
}
- Map<String, FileSensitivityAPIEntity> map = Maps.uniqueIndex(ipZones, new Function<FileSensitivityAPIEntity, String>(){
+ Map<String, HdfsSensitivityEntity> map = Maps.uniqueIndex(sensitivityAPIEntities, new Function<HdfsSensitivityEntity, String>(){
@Override
- public String apply(FileSensitivityAPIEntity input) {
- return input.getTags().get("filedir");
+ public String apply(HdfsSensitivityEntity input) {
+ return input.getFiledir();
}
});
ExternalDataCache.getInstance().setJobResult(getClass(), map);
@@ -60,7 +61,7 @@ public class FileSensitivityPollingJob implements Job{
}
}
- private List<FileSensitivityAPIEntity> load(JobDataMap jobDataMap) throws Exception{
+ private Collection<HdfsSensitivityEntity> load(JobDataMap jobDataMap) throws Exception{
Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
@@ -68,15 +69,7 @@ public class FileSensitivityPollingJob implements Job{
String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
// load from eagle database
LOG.info("Load file sensitivity information from eagle service " + eagleServiceHost + ":" + eagleServicePort);
- IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
- String query = "FileSensitivityService[]{*}";
- GenericServiceAPIResponseEntity<FileSensitivityAPIEntity> response = client.search()
- .pageSize(Integer.MAX_VALUE)
- .query(query)
- .send();
- client.close();
- if(response.getException() != null)
- throw new IllegalStateException(response.getException());
- return response.getObj();
+ IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+ return client.listHdfsSensitivities();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
index 2f7efc8..dc80eb9 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/java/org/apache/eagle/security/auditlog/timer/IPZonePollingJob.java
@@ -16,10 +16,13 @@
*/
package org.apache.eagle.security.auditlog.timer;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.security.service.IMetadataServiceClient;
+import org.apache.eagle.security.service.MetadataServiceClientImpl;
import org.apache.eagle.security.util.ExternalDataCache;
import org.quartz.Job;
import org.quartz.JobDataMap;
@@ -29,7 +32,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.security.entity.IPZoneEntity;
+import org.apache.eagle.security.service.IPZoneEntity;
import org.apache.eagle.service.client.IEagleServiceClient;
import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
import com.google.common.base.Function;
@@ -44,7 +47,7 @@ public class IPZonePollingJob implements Job{
throws JobExecutionException {
JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
try{
- List<IPZoneEntity> ipZones = load(jobDataMap);
+ Collection<IPZoneEntity> ipZones = load(jobDataMap);
if(ipZones == null){
LOG.warn("Ipzone information is empty");
return;
@@ -52,7 +55,7 @@ public class IPZonePollingJob implements Job{
Map<String, IPZoneEntity> map = Maps.uniqueIndex(ipZones, new Function<IPZoneEntity, String>(){
@Override
public String apply(IPZoneEntity input) {
- return input.getTags().get("iphost");
+ return input.getIphost();
}
});
ExternalDataCache.getInstance().setJobResult(getClass(), map);
@@ -61,7 +64,7 @@ public class IPZonePollingJob implements Job{
}
}
- private List<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{
+ private Collection<IPZoneEntity> load(JobDataMap jobDataMap) throws Exception{
Map<String, Object> map = (Map<String,Object>)jobDataMap.get(EagleConfigConstants.EAGLE_SERVICE);
String eagleServiceHost = (String)map.get(EagleConfigConstants.HOST);
Integer eagleServicePort = Integer.parseInt(map.get(EagleConfigConstants.PORT).toString());
@@ -69,15 +72,7 @@ public class IPZonePollingJob implements Job{
String password = map.containsKey(EagleConfigConstants.PASSWORD) ? (String)map.get(EagleConfigConstants.PASSWORD) : null;
// load from eagle database
LOG.info("Load ip zone information from eagle service " + eagleServiceHost + ":" + eagleServicePort);
- IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
- String query = "IPZoneService[]{*}";
- GenericServiceAPIResponseEntity<IPZoneEntity> response = client.search()
- .pageSize(Integer.MAX_VALUE)
- .query(query)
- .send();
- client.close();
- if(response.getException() != null)
- throw new IllegalStateException(response.getException());
- return response.getObj();
+ IMetadataServiceClient client = new MetadataServiceClientImpl(eagleServiceHost, eagleServicePort, "/rest");
+ return client.listIPZones();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
new file mode 100644
index 0000000..dadab98
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/providers/org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider.xml
@@ -0,0 +1,247 @@
+<?xml version="1.0" encoding="UTF-8" ?>
+<!--
+ ~ /*
+ ~ *
+ ~ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ * * contributor license agreements. See the NOTICE file distributed with
+ ~ * * this work for additional information regarding copyright ownership.
+ ~ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ * * (the "License"); you may not use this file except in compliance with
+ ~ * * the License. You may obtain a copy of the License at
+ ~ * * <p/>
+ ~ * * http://www.apache.org/licenses/LICENSE-2.0
+ ~ * * <p/>
+ ~ * * Unless required by applicable law or agreed to in writing, software
+ ~ * * distributed under the License is distributed on an "AS IS" BASIS,
+ ~ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ * * See the License for the specific language governing permissions and
+ ~ * * limitations under the License.
+ ~ *
+ ~ */
+ -->
+
+<application>
+ <type>HdfsAuditLogApplication</type>
+ <name>Hdfs Audit Log Monitoring Application</name>
+ <version>0.5.0-incubating</version>
+ <appClass>org.apache.eagle.security.auditlog.HdfsAuditLogApplication</appClass>
+ <viewPath>/apps/example</viewPath>
+ <configuration>
+ <property>
+ <name>dataSourceConfig.topic</name>
+ <displayName>dataSourceConfig.topic</displayName>
+ <value>hdfs_audit_log</value>
+ <description>data source topic</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnection</name>
+ <displayName>dataSourceConfig.zkConnection</displayName>
+ <value>server.eagle.apache.org</value>
+ <description>zk connection</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.zkConnectionTimeoutMS</name>
+ <displayName>dataSourceConfig.zkConnectionTimeoutMS</displayName>
+ <value>15000</value>
+ <description>zk connection timeout in milliseconds</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.fetchSize</name>
+ <displayName>dataSourceConfig.fetchSize</displayName>
+ <value>1048586</value>
+ <description>kafka fetch size</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKServers</name>
+ <displayName>dataSourceConfig.transactionZKServers</displayName>
+ <value>server.eagle.apache.org</value>
+ <description>zookeeper server for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKRoot</name>
+ <displayName>dataSourceConfig.transactionZKRoot</displayName>
+ <value>/consumers</value>
+ <description>offset transaction root</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.consumerGroupId</name>
+ <displayName>dataSourceConfig.consumerGroupId</displayName>
+ <value>eagle.hdfsaudit.consumer</value>
+ <description>kafka consumer group Id</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionStateUpdateMS</name>
+ <displayName>dataSourceConfig.transactionStateUpdateMS</displayName>
+ <value>2000</value>
+ <description>zk upate</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.schemeCls</name>
+ <displayName>dataSourceConfig.schemeCls</displayName>
+ <value>storm.kafka.StringScheme</value>
+ <description>scheme class</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>dataSourceConfig.transactionZKPort</name>
+ <displayName>dataSourceConfig.transactionZKPort</displayName>
+ <value>2181</value>
+ <description>zookeeper server port for offset transaction</description>
+ </property>
+ <property>
+ <name>topology.numOfSpoutTasks</name>
+ <displayName>topology.numOfSpoutTasks</displayName>
+ <value>2</value>
+ <description>number of spout tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfParserTasks</name>
+ <displayName>topology.numOfParserTasks</displayName>
+ <value>2</value>
+ <description>number of parser tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfJoinTasks</name>
+ <displayName>topology.numOfJoinTasks</displayName>
+ <value>2</value>
+ <description>number of external join tasks</description>
+ </property>
+ <property>
+ <name>topology.numOfSinkTasks</name>
+ <displayName>topology.numOfSinkTasks</displayName>
+ <value>2</value>
+ <description>number of sink tasks</description>
+ </property>
+ <property>
+ <name>eagleProps.dataJoinPollIntervalSec</name>
+ <displayName>eagleProps.dataJoinPollIntervalSec</displayName>
+ <value>30</value>
+ <description>interval in seconds for polling</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.host</name>
+ <displayName>eagleProps.eagleService.host</displayName>
+ <value>localhost</value>
+ <description>eagle service host</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.port</name>
+ <displayName>eagleProps.eagleService.port</displayName>
+ <value>8080</value>
+ <description>eagle service port</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.username</name>
+ <displayName>eagleProps.eagleService.username</displayName>
+ <value>admin</value>
+ <description>eagle service username</description>
+ </property>
+ <property>
+ <name>eagleProps.eagleService.password</name>
+ <displayName>eagleProps.eagleService.password</displayName>
+ <value>secret</value>
+ <description>eagle service password</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.topic</name>
+ <displayName>dataSinkConfig.topic</displayName>
+ <value>hdfs_audit_log_parsed</value>
+ <description>topic for kafka data sink</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.brokerList</name>
+ <displayName>dataSinkConfig.brokerList</displayName>
+ <value>sandbox.hortonworks.com:6667</value>
+ <description>kafka broker list</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.serializerClass</name>
+ <displayName>dataSinkConfig.serializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message value</description>
+ </property>
+ <property>
+ <name>dataSinkConfig.keySerializerClass</name>
+ <displayName>dataSinkConfig.keySerializerClass</displayName>
+ <value>kafka.serializer.StringEncoder</value>
+ <description>serializer class Kafka message key</description>
+ </property>
+
+ <!-- properties for hdfs file system access and attribute resolver-->
+ <property>
+ <name>fs.defaultFS</name>
+ <displayName>fs.defaultFS</displayName>
+ <value>hdfs://server.eagle.apache.org:8020</value>
+ <description>hdfs endpoint</description>
+ </property>
+ </configuration>
+ <streams>
+ <stream>
+ <streamId>hdfs_audit_log_stream</streamId>
+ <description>Hdfs Audit Log Stream</description>
+ <validate>true</validate>
+ <timeseries>true</timeseries>
+ <columns>
+ <column>
+ <name>action</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>host</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>status</name>
+ <type>string</type>
+ </column>
+ <column>
+ <name>timestamp</name>
+ <type>long</type>
+ </column>
+ </columns>
+ </stream>
+ </streams>
+ <docs>
+ <install>
+# Step 1: Create source kafka topic named "${site}_example_source_topic"
+
+./bin/kafka-topics.sh --create --topic example_source_topic --replication-factor 1 --replication 1
+
+# Step 2: Set up data collector to flow data into kafka topic in
+
+./bin/logstash -f log_collector.conf
+
+## `log_collector.conf` sample as following:
+
+input {
+
+}
+filter {
+
+}
+output{
+
+}
+
+# Step 3: start application
+
+# Step 4: monitor with featured portal or alert with policies
+ </install>
+ <uninstall>
+# Step 1: stop and uninstall application
+# Step 2: delete kafka topic named "${site}_example_source_topic"
+# Step 3: stop logstash
+ </uninstall>
+ </docs>
+</application>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
new file mode 100644
index 0000000..42cf62b
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/META-INF/services/org.apache.eagle.app.spi.ApplicationProvider
@@ -0,0 +1,37 @@
+#
+# /*
+# *
+# * * Licensed to the Apache Software Foundation (ASF) under one or more
+# * * contributor license agreements. See the NOTICE file distributed with
+# * * this work for additional information regarding copyright ownership.
+# * * The ASF licenses this file to You under the Apache License, Version 2.0
+# * * (the "License"); you may not use this file except in compliance with
+# * * the License. You may obtain a copy of the License at
+# * * <p/>
+# * * http://www.apache.org/licenses/LICENSE-2.0
+# * * <p/>
+# * * Unless required by applicable law or agreed to in writing, software
+# * * distributed under the License is distributed on an "AS IS" BASIS,
+# * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# * * See the License for the specific language governing permissions and
+# * * limitations under the License.
+# *
+# */
+#
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.eagle.security.auditlog.HdfsAuditLogAppProvider
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
index 3c3572e..efa6467 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/application.conf
@@ -14,56 +14,42 @@
# limitations under the License.
{
- "envContextConfig" : {
- "env" : "storm",
- "mode" : "local",
- "topologyName" : "auditLogProcessTopology",
- "stormConfigFile" : "security-auditlog-storm.yaml",
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 1,
- "hdfsAuditLogAlertExecutor*" : 1
- }
+ "appId" : "HdfsAuditLogApp",
+ "mode" : "LOCAL",
+ "siteId" : "testsite",
+ "topology" : {
+ "numOfTotalWorkers" : 2,
+ "numOfSpoutTasks" : 2,
+ "numOfParserTasks" : 2,
+ "numOfSensitivityJoinTasks" : 2,
+ "numOfIPZoneJoinTasks" : 2,
+ "numOfSinkTasks" : 2
},
"dataSourceConfig": {
- "topic" : "sandbox_hdfs_audit_log",
- "zkConnection" : "sandbox.hortonworks.com:2181",
+ "topic" : "hdfs_audit_log",
+ "zkConnection" : "server.eagle.apache.org:2181",
"zkConnectionTimeoutMS" : 15000,
"consumerGroupId" : "EagleConsumer",
"fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
- "transactionZKServers" : "sandbox.hortonworks.com",
+ "transactionZKServers" : "server.eagle.apache.org",
"transactionZKPort" : 2181,
"transactionZKRoot" : "/consumers",
- "transactionStateUpdateMS" : 2000
- },
- "alertExecutorConfigs" : {
- "hdfsAuditLogAlertExecutor" : {
- "parallelism" : 1,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
+ "transactionStateUpdateMS" : 2000,
+ "schemeCls" : "storm.kafka.StringScheme"
},
"eagleProps" : {
- "site" : "sandbox",
- "application": "hdfsAuditLog",
"dataJoinPollIntervalSec" : 30,
- "mailHost" : "mailHost.com",
- "mailSmtpPort":"25",
- "mailDebug" : "true",
- "balancePartitionEnabled" : true,
- #"partitionRefreshIntervalInMin" : 60,
- #"kafkaStatisticRangeInMin" : 60,
"eagleService": {
"host": "localhost",
- "port": 38080,
+ "port": 9090,
"username": "admin",
"password": "secret"
- },
- "readHdfsUserCommandPatternFrom" : "file"
+ }
},
- "dynamicConfigSource" : {
- "enabled" : true,
- "initDelayMillis" : 0,
- "delayMillis" : 30000
+ "dataSinkConfig": {
+ "topic" : "hdfs_audit_log_parsed",
+ "brokerList" : "server.eagle.apache.org:6667",
+ "serializerClass" : "kafka.serializer.StringEncoder",
+ "keySerializerClass" : "kafka.serializer.StringEncoder"
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
index 4a22987..e442c46 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
@@ -13,17 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-log4j.rootLogger=DEBUG, stdout, DRFA
+log4j.rootLogger=INFO, stdout, DRFA
eagle.log.dir=./logs
eagle.log.file=eagle.log
-#log4j.logger.org.apache.eagle.security.auditlog.IPZoneDataJoinExecutor=DEBUG
-#log4j.logger.org.apache.eagle.security.auditlog.FileSensitivityDataJoinExecutor=DEBUG
-log4j.logger.org.apache.eagle.security.auditlog.HdfsUserCommandReassembler=DEBUG
-#log4j.logger.org.apache.eagle.executor.AlertExecutor=DEBUG
-# standard output
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
deleted file mode 100644
index a68a323..0000000
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/security-auditlog-storm.yaml
+++ /dev/null
@@ -1,18 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-topology.workers: 1
-topology.acker.executors: 1
-topology.tasks: 1
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
index a19e9b6..753eb41 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/java/org/apache/eagle/security/auditlog/TestUserCommandReassembler.java
@@ -22,6 +22,8 @@ package org.apache.eagle.security.auditlog;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.security.hdfs.HDFSAuditLogObject;
+import org.apache.eagle.security.hdfs.HDFSAuditLogParser;
import org.junit.Assert;
import org.junit.Test;
import scala.Tuple2;
@@ -32,9 +34,18 @@ import java.util.*;
* Created by yonzhang on 11/24/15.
*/
public class TestUserCommandReassembler {
- private Map<String, Object> parseEvent(String log){
- HdfsAuditLogKafkaDeserializer deserializer = new HdfsAuditLogKafkaDeserializer(null);
- return (Map<String, Object>)deserializer.deserialize(log.getBytes());
+ private Map parseEvent(String log) throws Exception{
+ HDFSAuditLogParser deserializer = new HDFSAuditLogParser();
+ HDFSAuditLogObject entity = deserializer.parse(log);
+ Map<String, Object> map = new TreeMap<String, Object>();
+ map.put("src", entity.src);
+ map.put("dst", entity.dst);
+ map.put("host", entity.host);
+ map.put("timestamp", entity.timestamp);
+ map.put("allowed", entity.allowed);
+ map.put("user", entity.user);
+ map.put("cmd", entity.cmd);
+ return map;
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
new file mode 100644
index 0000000..361304d
--- /dev/null
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/test/resources/securityAuditLog
@@ -0,0 +1,17 @@
+2015-04-24 12:49:16,145 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc
+2015-04-24 12:49:16,192 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc
+2015-04-24 12:49:20,518 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc
+2015-04-24 12:49:20,570 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc
+2015-04-24 12:49:20,587 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/ dst=null perm=null proto=rpc
+2015-04-24 12:49:20,664 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/tmp dst=null perm=hdfs:hdfs:rwxr-xr-x proto=rpc
+2015-04-24 12:49:20,677 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user dst=null perm=null proto=rpc
+2015-04-24 12:49:20,686 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=mkdirs src=/user/ambari-qa dst=null perm=hdfs:hdfs:rwxr-xr-x proto=rpc
+2015-04-24 12:49:24,828 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc
+2015-04-24 12:49:24,915 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setPermission src=/tmp dst=null perm=hdfs:hdfs:rwxrwxrwx proto=rpc
+2015-04-24 12:49:29,375 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc
+2015-04-24 12:49:29,453 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setPermission src=/user/ambari-qa dst=null perm=hdfs:hdfs:rwxrwx--- proto=rpc
+2015-04-24 12:49:33,542 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/tmp dst=null perm=null proto=rpc
+2015-04-24 12:49:37,844 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/user/ambari-qa dst=null perm=null proto=rpc
+2015-04-24 12:49:37,929 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=setOwner src=/user/ambari-qa dst=null perm=ambari-qa:hdfs:rwxrwx--- proto=rpc
+2015-04-24 12:51:31,798 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/apps/hbase/data dst=null perm=null proto=rpc
+2015-04-24 12:51:31,863 INFO FSNamesystem.audit: allowed=true ugi=hdfs (auth:SIMPLE) ip=/10.0.2.15 cmd=getfileinfo src=/apps/hbase/staging dst=null perm=null proto=rpc
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
index b2a2671..a0a230a 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSCommandResolver.java
@@ -16,6 +16,8 @@
*/
package org.apache.eagle.service.security.hdfs.resolver;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
import org.apache.eagle.service.alert.resolver.AttributeResolvable;
import org.apache.eagle.service.alert.resolver.AttributeResolveException;
import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
@@ -33,6 +35,10 @@ import java.util.regex.Pattern;
public class HDFSCommandResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> {
private final static Logger LOG = LoggerFactory.getLogger(HDFSCommandResolver.class);
+ public HDFSCommandResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+
+ }
+
private final static String [] cmdStrs = {"open", "create", "append", "delete", "listfileinfo", "rename",
"mkdirs", "listStatus", "setReplication", "setOwner", "setPermission", "setTimes", "setXAttr", "removeXAttr", "getXAttrs",
"contentSummary", "createEncryptionZone", "checkAccess"};
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
index 4326c93..370d9a3 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSResourceResolver.java
@@ -18,16 +18,19 @@ package org.apache.eagle.service.security.hdfs.resolver;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.typesafe.config.Config;
-import org.apache.eagle.security.resolver.MetadataAccessConfigRepo;
+import org.apache.eagle.metadata.model.ApplicationEntity;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
import org.apache.eagle.service.alert.resolver.AttributeResolvable;
import org.apache.eagle.service.alert.resolver.AttributeResolveException;
import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
import org.apache.eagle.service.alert.resolver.GenericAttributeResolveRequest;
+import org.apache.eagle.service.security.hdfs.rest.HDFSResourceWebResource;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.slf4j.Logger;
@@ -45,6 +48,12 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceConstants;
*/
public class HDFSResourceResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> {
private final static Logger LOG = LoggerFactory.getLogger(HDFSResourceResolver.class);
+ private ApplicationEntityService entityService;
+
+ public HDFSResourceResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+ this.entityService = entityService;
+ }
+
/**
* HDFS Resource Resolve API
*
@@ -54,10 +63,9 @@ public class HDFSResourceResolver implements AttributeResolvable<GenericAttribu
public List<String> resolve(GenericAttributeResolveRequest request)
throws AttributeResolveException {
List<String> result = new ArrayList<>();
- MetadataAccessConfigRepo repo = new MetadataAccessConfigRepo();
try {
- Config config = repo.getConfig(HDFSResourceConstants.HDFS_APPLICATION, request.getSite().trim());
- Configuration conf = repo.convert(config);
+ Map<String, Object> config = getAppConfig(request.getSite(), HDFSResourceWebResource.HDFS_APPLICATION);
+ Configuration conf = convert(config);
HDFSFileSystem fileSystem = new HDFSFileSystem(conf);
String query = request.getQuery().trim();
List<FileStatus> fileStatuses = null;
@@ -86,6 +94,19 @@ public class HDFSResourceResolver implements AttributeResolvable<GenericAttribu
}
}
+ private Map<String, Object> getAppConfig(String site, String appType){
+ ApplicationEntity entity = entityService.getBySiteIdAndAppType(site, appType);
+ return entity.getConfiguration();
+ }
+
+ private Configuration convert(Map<String, Object> originalConfig) throws Exception {
+ Configuration config = new Configuration();
+ for (Map.Entry<String, Object> entry : originalConfig.entrySet()) {
+ config.set(entry.getKey().toString(), entry.getValue().toString());
+ }
+ return config;
+ }
+
/**
* Validate the Passed Request Object
* It should have Site Id and File Path
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/27513f7b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
index f1d8808..5f3ec54 100644
--- a/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
+++ b/eagle-security/eagle-security-hdfs-web/src/main/java/org/apache/eagle/service/security/hdfs/resolver/HDFSSensitivityTypeResolver.java
@@ -16,6 +16,13 @@
*/
package org.apache.eagle.service.security.hdfs.resolver;
+import com.google.inject.Inject;
+import com.typesafe.config.Config;
+import org.apache.eagle.metadata.service.ApplicationEntityService;
+import org.apache.eagle.security.service.HBaseSensitivityEntity;
+import org.apache.eagle.security.service.HdfsSensitivityEntity;
+import org.apache.eagle.security.service.ISecurityMetadataDAO;
+import org.apache.eagle.security.service.MetadataDaoFactory;
import org.apache.eagle.service.alert.resolver.AttributeResolvable;
import org.apache.eagle.service.alert.resolver.AttributeResolveException;
import org.apache.eagle.service.alert.resolver.BadAttributeResolveRequestException;
@@ -24,16 +31,17 @@ import org.apache.eagle.service.security.hdfs.HDFSResourceSensitivityService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.regex.Pattern;
public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericAttributeResolveRequest,String> {
private final static Logger LOG = LoggerFactory.getLogger(HDFSSensitivityTypeResolver.class);
- private HDFSResourceSensitivityService dao = new HDFSResourceSensitivityService();
- private Map<String, Map<String, String>> maps = dao.getAllFileSensitivityMap();
+ private ISecurityMetadataDAO dao;
+ @Inject
+ public HDFSSensitivityTypeResolver(ApplicationEntityService entityService, Config eagleServerConfig){
+ dao = MetadataDaoFactory.getMetadataDAO(eagleServerConfig);
+ }
private final static String SENSITIVETYPE_ATTRIBUTE_RESOLVE_FORMAT_HINT = "Sensitive type should be composed of a-z, A-Z, 0-9 or -";
@@ -41,6 +49,7 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA
String query = request.getQuery().trim();
String site = request.getSite().trim();
List<String> res = new ArrayList<>();
+ Map<String, Map<String, String>> maps = getAllSensitivities();
Map<String, String> map = maps.get(site);
if(map == null) {
@@ -72,4 +81,16 @@ public class HDFSSensitivityTypeResolver implements AttributeResolvable<GenericA
public Class<GenericAttributeResolveRequest> getRequestClass() {
return GenericAttributeResolveRequest.class;
}
+
+ private Map<String, Map<String, String>> getAllSensitivities(){
+ Map<String, Map<String, String>> all = new HashMap<>();
+ Collection<HdfsSensitivityEntity> entities = dao.listHdfsSensitivities();
+ for(HdfsSensitivityEntity entity : entities){
+ if(!all.containsKey(entity.getSite())){
+ all.put(entity.getSite(), new HashMap<>());
+ }
+ all.get(entity.getSite()).put(entity.getFiledir(), entity.getSensitivityType());
+ }
+ return all;
+ }
}