You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ce...@apache.org on 2016/03/08 22:45:54 UTC
incubator-metron git commit: METRON-62 Add writing to enriched data
back to the enrichment topology closes apache/incubator-metron#39
Repository: incubator-metron
Updated Branches:
refs/heads/master 560f7abe7 -> d043f2453
METRON-62 Add writing to enriched data back to the enrichment topology closes apache/incubator-metron#39
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/d043f245
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/d043f245
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/d043f245
Branch: refs/heads/master
Commit: d043f24536d14fd1421d5b72af4a7dd92c69ec38
Parents: 560f7ab
Author: cestella <ce...@gmail.com>
Authored: Tue Mar 8 16:45:45 2016 -0500
Committer: cstella <ce...@gmail.com>
Committed: Tue Mar 8 16:45:45 2016 -0500
----------------------------------------------------------------------
.../roles/metron_streaming/tasks/main.yml | 13 ++
.../metron_streaming/tasks/threat_intel.yml | 2 -
deployment/roles/metron_streaming/vars/main.yml | 2 +
.../metron/bolt/BulkMessageWriterBolt.java | 3 +-
.../java/org/apache/metron/utils/JSONUtils.java | 8 +
.../writer/interfaces/BulkMessageWriter.java | 3 +-
.../src/main/bash/latency_summarizer.sh | 32 ++++
.../src/main/bash/start_topology.sh | 22 +++
.../src/main/bash/zk_load_configs.sh | 33 ++++
.../enrichment/bolt/EnrichmentSplitterBolt.java | 4 +-
metron-streaming/Metron-Indexing/pom.xml | 16 +-
.../metron/writer/ElasticsearchWriter.java | 2 +-
.../org/apache/metron/writer/HdfsWriter.java | 44 -----
.../apache/metron/writer/hdfs/HdfsWriter.java | 94 ++++++++++
.../writer/hdfs/SourceAwareMoveAction.java | 48 +++++
.../writer/hdfs/SourceFileNameFormat.java | 48 +++++
.../metron/writer/hdfs/SourceHandler.java | 160 ++++++++++++++++
.../apache/metron/utils/LatencySummarizer.java | 188 +++++++++++++++++++
.../apache/metron/utils/SourceConfigUtils.java | 61 ++++--
.../Metron_Configs/etc/env/config.properties | 2 +-
.../topologies/enrichment/remote.yaml | 33 ++++
.../topologies/enrichment/test.yaml | 34 ++++
.../integration/EnrichmentIntegrationTest.java | 153 +++++++++++----
23 files changed, 895 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index ad1e081..b1b8734 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -44,6 +44,12 @@
- name: Add hbase-site.xml to topology jar
shell: cd {{ hbase_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} hbase-site.xml
+- name: Add core-site.xml to topology jar
+ shell: cd {{ hdfs_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} core-site.xml
+
+- name: Add hdfs-site.xml to topology jar
+ shell: cd {{ hdfs_config_path }} && jar -uf {{ metron_directory }}/lib/{{ metron_jar_name }} hdfs-site.xml
+
- name: Copy Metron topology config files
copy:
src: "{{ metron_src_config_path }}/{{ item }}"
@@ -56,6 +62,12 @@
include_vars: "../roles/mysql_server/vars/main.yml"
when: mysql_root_password is undefined
+- name: Create root user HDFS directory
+ command: su - hdfs -c "hdfs dfs -mkdir -p /user/root && hdfs dfs -chown root:root /user/root"
+
+- name: Create Metron HDFS output directory
+ command: su - hdfs -c "hdfs dfs -mkdir -p {{ metron_hdfs_output_dir }} && hdfs dfs -chown hdfs:hadoop {{ metron_hdfs_output_dir }} && hdfs dfs -chmod 775 {{ metron_hdfs_output_dir }}"
+
- name: Configure Metron topologies
lineinfile: >
dest={{ metron_properties_config_path }}
@@ -77,6 +89,7 @@
- { regexp: "threat.intel.ip.cf=", line: "threat.intel.ip.cf=t" }
- { regexp: "mysql.ip=", line: "mysql.ip={{ groups.mysql[0] }}" }
- { regexp: "mysql.password=", line: "mysql.password={{ mysql_root_password }}" }
+ - { regexp: "index.hdfs.output=", line: "index.hdfs.output={{ metron_hdfs_output_dir }}/enrichment/indexed" }
- name: Add Elasticsearch templates for topologies
uri:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/tasks/threat_intel.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/threat_intel.yml b/deployment/roles/metron_streaming/tasks/threat_intel.yml
index 0439e46..6798319 100644
--- a/deployment/roles/metron_streaming/tasks/threat_intel.yml
+++ b/deployment/roles/metron_streaming/tasks/threat_intel.yml
@@ -15,8 +15,6 @@
# limitations under the License.
#
---
-- name: Create root user HDFS directory
- command: su - hdfs -c "hdfs dfs -mkdir -p /user/root && hdfs dfs -chown root:root /user/root"
- name: Create Bulk load working Directory
file:
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/deployment/roles/metron_streaming/vars/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/vars/main.yml b/deployment/roles/metron_streaming/vars/main.yml
index b93a70f..83379c6 100644
--- a/deployment/roles/metron_streaming/vars/main.yml
+++ b/deployment/roles/metron_streaming/vars/main.yml
@@ -26,3 +26,5 @@ elasticsearch_config_path: /etc/elasticsearch
elasticsearch_cluster_name: metron
elasticsearch_transport_port: 9300
hbase_config_path: "/etc/hbase/conf"
+hdfs_config_path: "/etc/hadoop/conf"
+metron_hdfs_output_dir: "/apps/metron"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
index a8fda69..d827536 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/bolt/BulkMessageWriterBolt.java
@@ -45,7 +45,6 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
private Map<String, List<Tuple>> sourceTupleMap = new HashMap<>();
private Map<String, List<JSONObject>> sourceMessageMap = new HashMap<>();
-
public BulkMessageWriterBolt(String zookeeperUrl) {
super(zookeeperUrl);
}
@@ -59,7 +58,7 @@ public class BulkMessageWriterBolt extends ConfiguredBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
super.prepare(stormConf, context, collector);
- bulkMessageWriter.init();
+ bulkMessageWriter.init(stormConf);
}
@SuppressWarnings("unchecked")
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
index d704908..93b0a58 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/utils/JSONUtils.java
@@ -19,6 +19,7 @@
package org.apache.metron.utils;
import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
@@ -51,6 +52,13 @@ public enum JSONUtils {
}
};
+ public <T> T load(InputStream is, TypeReference<T> ref) throws IOException {
+ return _mapper.get().readValue(is, ref);
+ }
+ public <T> T load(String is, TypeReference<T> ref) throws IOException {
+ return _mapper.get().readValue(is, ref);
+ }
+
public <T> T load(InputStream is, Class<T> clazz) throws IOException {
return _mapper.get().readValue(is, clazz);
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
index 90c0261..9b627e6 100644
--- a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/writer/interfaces/BulkMessageWriter.java
@@ -21,10 +21,11 @@ import backtype.storm.tuple.Tuple;
import org.apache.metron.domain.SourceConfig;
import java.util.List;
+import java.util.Map;
public interface BulkMessageWriter<T> extends AutoCloseable {
- void init();
+ void init(Map stormConf);
void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<T> messages) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh b/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
new file mode 100755
index 0000000..335c3fd
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/latency_summarizer.sh
@@ -0,0 +1,32 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.LatencySummarizer "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh b/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
new file mode 100755
index 0000000..21626c2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/start_topology.sh
@@ -0,0 +1,22 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+METRON_VERSION=0.1BETA
+METRON_HOME=/usr/metron/$METRON_VERSION
+TOPOLOGY_JAR=Metron-Topologies-$METRON_VERSION.jar
+storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/config/topologies/$1/remote.yaml --filter $METRON_HOME/config/etc/env/config.properties
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh b/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
new file mode 100755
index 0000000..4273b7d
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/zk_load_configs.sh
@@ -0,0 +1,33 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+BIGTOP_DEFAULTS_DIR=${BIGTOP_DEFAULTS_DIR-/etc/default}
+[ -n "${BIGTOP_DEFAULTS_DIR}" -a -r ${BIGTOP_DEFAULTS_DIR}/hbase ] && . ${BIGTOP_DEFAULTS_DIR}/hbase
+
+# Autodetect JAVA_HOME if not defined
+if [ -e /usr/libexec/bigtop-detect-javahome ]; then
+ . /usr/libexec/bigtop-detect-javahome
+elif [ -e /usr/lib/bigtop-utils/bigtop-detect-javahome ]; then
+ . /usr/lib/bigtop-utils/bigtop-detect-javahome
+fi
+export METRON_VERSION=0.1BETA
+export METRON_HOME=/usr/metron/$METRON_VERSION
+export TOPOLOGIES_JAR=Metron-Topologies-$METRON_VERSION.jar
+export ZK_HOME=${ZK_HOME:-/usr/hdp/current/hbase-client}
+java -cp $METRON_HOME/lib/$TOPOLOGIES_JAR org.apache.metron.utils.SourceConfigUtils "$@"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
index 51508d8..7970674 100644
--- a/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
+++ b/metron-streaming/Metron-EnrichmentAdapters/src/main/java/org/apache/metron/enrichment/bolt/EnrichmentSplitterBolt.java
@@ -82,12 +82,13 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
byte[] data = tuple.getBinary(0);
try {
message = (JSONObject) parser.parse(new String(data, "UTF8"));
+ message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
} catch (ParseException | UnsupportedEncodingException e) {
e.printStackTrace();
}
} else {
message = (JSONObject) tuple.getValueByField(messageFieldName);
- message.put(getClass().getSimpleName().toLowerCase() + ".splitter.ts", "" + System.currentTimeMillis());
+ message.put(getClass().getSimpleName().toLowerCase() + ".splitter.begin.ts", "" + System.currentTimeMillis());
}
return message;
}
@@ -119,6 +120,7 @@ public class EnrichmentSplitterBolt extends SplitBolt<JSONObject> {
streamMessageMap.put(enrichmentType, enrichmentObject);
}
}
+ message.put(getClass().getSimpleName().toLowerCase() + ".splitter.end.ts", "" + System.currentTimeMillis());
return streamMessageMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/pom.xml b/metron-streaming/Metron-Indexing/pom.xml
index e2a1037..1f5d04d 100644
--- a/metron-streaming/Metron-Indexing/pom.xml
+++ b/metron-streaming/Metron-Indexing/pom.xml
@@ -92,7 +92,21 @@
<version>1.9</version>
<scope>provided</scope>
</dependency>
-
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-hdfs</artifactId>
+ <version>${global_storm_version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<reporting>
<plugins>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 2769efe..e8d654d 100644
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -60,7 +60,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
}
@Override
- public void init() {
+ public void init(Map stormConf) {
ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
builder.put("cluster.name", clusterName);
builder.put("client.transport.ping_timeout","500s");
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
deleted file mode 100644
index eace952..0000000
--- a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/HdfsWriter.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.metron.writer;
-
-import backtype.storm.tuple.Tuple;
-import org.apache.metron.domain.SourceConfig;
-import org.apache.metron.writer.interfaces.BulkMessageWriter;
-import org.json.simple.JSONObject;
-
-import java.io.Serializable;
-import java.util.List;
-
-public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
-
- @Override
- public void init() {
-
- }
-
- @Override
- public void write(String sourceType, SourceConfig configuration, List<Tuple> tuples, List<JSONObject> messages) throws Exception {
-
- }
-
- @Override
- public void close() {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
new file mode 100644
index 0000000..d2cc827
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/HdfsWriter.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.tuple.Tuple;
+import org.apache.metron.domain.SourceConfig;
+import org.apache.metron.writer.interfaces.BulkMessageWriter;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.NoRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.CountSyncPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.json.simple.JSONObject;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class HdfsWriter implements BulkMessageWriter<JSONObject>, Serializable {
+ List<RotationAction> rotationActions = new ArrayList<>();
+ FileRotationPolicy rotationPolicy = new NoRotationPolicy();
+ SyncPolicy syncPolicy = new CountSyncPolicy(1); //sync every time, duh.
+ FileNameFormat fileNameFormat;
+ Map<String, SourceHandler> sourceHandlerMap = new HashMap<>();
+ transient Map stormConfig;
+ public HdfsWriter withFileNameFormat(FileNameFormat fileNameFormat){
+ this.fileNameFormat = fileNameFormat;
+ return this;
+ }
+
+ public HdfsWriter withSyncPolicy(SyncPolicy syncPolicy){
+ this.syncPolicy = syncPolicy;
+ return this;
+ }
+ public HdfsWriter withRotationPolicy(FileRotationPolicy rotationPolicy){
+ this.rotationPolicy = rotationPolicy;
+ return this;
+ }
+
+ public HdfsWriter addRotationAction(RotationAction action){
+ this.rotationActions.add(action);
+ return this;
+ }
+
+ @Override
+ public void init(Map stormConfig) {
+ this.stormConfig = stormConfig;
+ }
+
+ @Override
+ public void write( String sourceType
+ , SourceConfig configuration
+ , List<Tuple> tuples
+ , List<JSONObject> messages
+ ) throws Exception
+ {
+ SourceHandler handler = getSourceHandler(sourceType);
+ handler.handle(messages);
+ }
+
+ @Override
+ public void close() {
+ for(SourceHandler handler : sourceHandlerMap.values()) {
+ handler.close();
+ }
+ }
+ private synchronized SourceHandler getSourceHandler(String sourceType) throws IOException {
+ SourceHandler ret = sourceHandlerMap.get(sourceType);
+ if(ret == null) {
+ ret = new SourceHandler(rotationActions, rotationPolicy, syncPolicy, new SourceFileNameFormat(sourceType, fileNameFormat), stormConfig);
+ sourceHandlerMap.put(sourceType, ret);
+ }
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
new file mode 100644
index 0000000..1c345b4
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceAwareMoveAction.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+
+import java.io.IOException;
+
+public class SourceAwareMoveAction implements RotationAction{
+ private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+ private String destination;
+
+ public SourceAwareMoveAction toDestination(String destDir){
+ destination = destDir;
+ return this;
+ }
+
+ private static String getSource(Path filePath) {
+ return filePath.getParent().getName();
+ }
+
+ @Override
+ public void execute(FileSystem fileSystem, Path filePath) throws IOException {
+ Path destPath = new Path(new Path(destination, getSource(filePath)), filePath.getName());
+ LOG.info("Moving file " + filePath + " to " + destPath);
+ boolean success = fileSystem.rename(filePath, destPath);
+ return;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
new file mode 100644
index 0000000..ae0242d
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceFileNameFormat.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import backtype.storm.task.TopologyContext;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+
+import java.util.Map;
+
+public class SourceFileNameFormat implements FileNameFormat {
+ FileNameFormat delegate;
+ String sourceType;
+ public SourceFileNameFormat(String sourceType, FileNameFormat delegate) {
+ this.delegate = delegate;
+ this.sourceType = sourceType;
+ }
+
+ @Override
+ public void prepare(Map map, TopologyContext topologyContext) {
+ this.delegate.prepare(map, topologyContext);
+ }
+
+ @Override
+ public String getName(long l, long l1) {
+ return delegate.getName(l, l1);
+ }
+
+ @Override
+ public String getPath() {
+ return delegate.getPath() + "/" + sourceType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
new file mode 100644
index 0000000..0225137
--- /dev/null
+++ b/metron-streaming/Metron-Indexing/src/main/java/org/apache/metron/writer/hdfs/SourceHandler.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.writer.hdfs;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.util.MD5FileUtils;
+import org.apache.hadoop.io.MD5Hash;
+import org.apache.log4j.Logger;
+import org.apache.storm.hdfs.bolt.format.FileNameFormat;
+import org.apache.storm.hdfs.bolt.rotation.FileRotationPolicy;
+import org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy;
+import org.apache.storm.hdfs.bolt.sync.SyncPolicy;
+import org.apache.storm.hdfs.common.rotation.RotationAction;
+import org.apache.storm.hdfs.common.security.HdfsSecurityUtil;
+import org.json.simple.JSONObject;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.*;
+
+public class SourceHandler {
+ private static final Logger LOG = Logger.getLogger(SourceHandler.class);
+ List<RotationAction> rotationActions = new ArrayList<>();
+ FileRotationPolicy rotationPolicy;
+ SyncPolicy syncPolicy;
+ FileNameFormat fileNameFormat;
+ private long offset = 0;
+ private int rotation = 0;
+ private transient FSDataOutputStream out;
+ private transient Object writeLock;
+ protected transient Timer rotationTimer; // only used for TimedRotationPolicy
+ protected transient FileSystem fs;
+ protected transient Path currentFile;
+ public SourceHandler(List<RotationAction> rotationActions
+ , FileRotationPolicy rotationPolicy
+ , SyncPolicy syncPolicy
+ , FileNameFormat fileNameFormat
+ , Map config
+ ) throws IOException {
+ this.rotationActions = rotationActions;
+ this.rotationPolicy = rotationPolicy;
+ this.syncPolicy = syncPolicy;
+ this.fileNameFormat = fileNameFormat;
+ initialize(config);
+ }
+
+ public void handle(List<JSONObject> messages) throws Exception{
+
+ for(JSONObject message : messages) {
+ byte[] bytes = (message.toJSONString() + "\n").getBytes();
+ synchronized (this.writeLock) {
+ out.write(bytes);
+ this.offset += bytes.length;
+
+ if (this.syncPolicy.mark(null, this.offset)) {
+ if (this.out instanceof HdfsDataOutputStream) {
+ ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ } else {
+ this.out.hsync();
+ }
+ this.syncPolicy.reset();
+ }
+ }
+
+ if (this.rotationPolicy.mark(null, this.offset)) {
+ rotateOutputFile(); // synchronized
+ this.offset = 0;
+ this.rotationPolicy.reset();
+ }
+ }
+ }
+
+ private void initialize(Map config) throws IOException {
+ this.writeLock = new Object();
+ Configuration hdfsConfig = new Configuration();
+ this.fs = FileSystem.get(new Configuration());
+ HdfsSecurityUtil.login(config, hdfsConfig);
+ this.currentFile = createOutputFile();
+ if(this.rotationPolicy instanceof TimedRotationPolicy){
+ long interval = ((TimedRotationPolicy)this.rotationPolicy).getInterval();
+ this.rotationTimer = new Timer(true);
+ TimerTask task = new TimerTask() {
+ @Override
+ public void run() {
+ try {
+ rotateOutputFile();
+ } catch(IOException e){
+ LOG.warn("IOException during scheduled file rotation.", e);
+ }
+ }
+ };
+ this.rotationTimer.scheduleAtFixedRate(task, interval, interval);
+ }
+ }
+
+ protected void rotateOutputFile() throws IOException {
+ LOG.info("Rotating output file...");
+ long start = System.currentTimeMillis();
+ synchronized (this.writeLock) {
+ closeOutputFile();
+ this.rotation++;
+
+ Path newFile = createOutputFile();
+ LOG.info("Performing " + this.rotationActions.size() + " file rotation actions." );
+ for (RotationAction action : this.rotationActions) {
+ action.execute(this.fs, this.currentFile);
+ }
+ this.currentFile = newFile;
+ }
+ long time = System.currentTimeMillis() - start;
+ LOG.info("File rotation took " + time + " ms.");
+ }
+
+ private Path createOutputFile() throws IOException {
+ Path path = new Path(this.fileNameFormat.getPath(), this.fileNameFormat.getName(this.rotation, System.currentTimeMillis()));
+ if(fs.getScheme().equals("file")) {
+ //in the situation where we're running this in a local filesystem, flushing doesn't work.
+ fs.mkdirs(path.getParent());
+ this.out = new FSDataOutputStream(new FileOutputStream(path.toString()), null);
+ }
+ else {
+ this.out = this.fs.create(path);
+ }
+ return path;
+ }
+
+ private void closeOutputFile() throws IOException {
+ this.out.close();
+ }
+
+
+ public void close() {
+ try {
+ closeOutputFile();
+ } catch (IOException e) {
+ throw new RuntimeException("Unable to close output file.", e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
new file mode 100644
index 0000000..3066334
--- /dev/null
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/LatencySummarizer.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.utils;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.base.Joiner;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import org.apache.commons.cli.*;
+import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.PrintStream;
+import java.util.*;
+
+public class LatencySummarizer {
+ public static class Pair extends AbstractMap.SimpleEntry<String, String> {
+ public Pair(String key, String value) {
+ super(key, value);
+ }
+ }
+
+ public static class LatencyStats {
+ private NavigableMap<Integer, Map<Pair, DescriptiveStatistics>> depthMap = new TreeMap<>();
+ private List<String> metrics;
+ public void updateMetrics(List<String> metrics) {
+ this.metrics = metrics;
+ }
+ public Map<Pair, DescriptiveStatistics> getStatsMap(int depth) {
+ Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+ if(statsMap == null) {
+ statsMap = new HashMap<>();
+ depthMap.put(depth, statsMap);
+ }
+ return statsMap;
+ }
+ public DescriptiveStatistics getStats( int depth, Pair p) {
+ Map<Pair, DescriptiveStatistics> statsMap = getStatsMap(depth);
+ DescriptiveStatistics stats = statsMap.get(p);
+ if(stats == null) {
+ stats = new DescriptiveStatistics();
+ statsMap.put(p, stats);
+ }
+ return stats;
+ }
+ public void put(int depth, Pair p, double val) {
+ getStats(depth, p).addValue(val);
+ }
+
+ public static void summary(String title, DescriptiveStatistics statistics, PrintStream pw, boolean meanOnly) {
+ if(meanOnly) {
+ pw.println(title + ": "
+ + "\n\tMean: " + statistics.getMean()
+ );
+ }
+ else {
+ pw.println(title + ": "
+ + "\n\tMean: " + statistics.getMean()
+ + "\n\tMin: " + statistics.getMin()
+ + "\n\t1th: " + statistics.getPercentile(1)
+ + "\n\t5th: " + statistics.getPercentile(5)
+ + "\n\t10th: " + statistics.getPercentile(10)
+ + "\n\t25th: " + statistics.getPercentile(25)
+ + "\n\t50th: " + statistics.getPercentile(50)
+ + "\n\t90th: " + statistics.getPercentile(90)
+ + "\n\t95th: " + statistics.getPercentile(95)
+ + "\n\t99th: " + statistics.getPercentile(99)
+ + "\n\tMax: " + statistics.getMax()
+ + "\n\tStdDev: " + statistics.getStandardDeviation()
+ );
+ }
+ }
+ public void printDepthSummary(int depth, boolean meanOnly) {
+ Map<Pair, DescriptiveStatistics> statsMap = depthMap.get(depth);
+ System.out.println("\nDistance " + depth);
+ System.out.println("----------------\n");
+ List<Map.Entry<Pair, DescriptiveStatistics>> sortedStats = new ArrayList<>();
+ for(Map.Entry<Pair, DescriptiveStatistics> stats : statsMap.entrySet()) {
+ sortedStats.add(stats);
+ }
+ Collections.sort(sortedStats, new Comparator<Map.Entry<Pair, DescriptiveStatistics>>() {
+ @Override
+ public int compare(Map.Entry<Pair, DescriptiveStatistics> o1, Map.Entry<Pair, DescriptiveStatistics> o2) {
+ return -1*Double.compare(o1.getValue().getMean(), o2.getValue().getMean());
+ }
+ });
+ for(Map.Entry<Pair, DescriptiveStatistics> stats : sortedStats) {
+ summary(stats.getKey().getKey() + " -> " + stats.getKey().getValue(), stats.getValue(), System.out, meanOnly);
+ }
+ }
+ public void printSummary(boolean meanOnly) {
+ System.out.println("Flow:");
+ System.out.println("\t" + Joiner.on(" -> ").join(metrics));
+ System.out.println("\nSUMMARY BY DISTANCE\n--------------------------");
+ for(int depth : depthMap.keySet()) {
+ printDepthSummary(depth, meanOnly);
+ }
+ }
+
+ }
+
+ public static String getBaseMetric(String s) {
+ Iterable<String> tokenIt = Splitter.on('.').split(s);
+ int num = Iterables.size(tokenIt);
+ return Joiner.on('.').join(Iterables.limit(tokenIt, num-1));
+ }
+
+ public static void updateStats(LatencyStats stats, Map<String, Object> doc) {
+ Map<String, Long> latencyMap = new HashMap<>();
+ NavigableMap<Long, String> latencyInvMap = new TreeMap<>();
+ for(Map.Entry<String, Object> kv : doc.entrySet()) {
+ if(kv.getKey().endsWith(".ts")) {
+ String base = getBaseMetric(kv.getKey());
+ long latency = Long.parseLong(kv.getValue().toString());
+ latencyInvMap.put(latency, base);
+ latencyMap.put( base, latency);
+ }
+ }
+ List<String> metrics = new ArrayList<>();
+ for(Map.Entry<Long, String> kv : latencyInvMap.entrySet()) {
+ metrics.add(kv.getValue());
+ }
+ stats.updateMetrics(metrics);
+ for(int i = 0;i < metrics.size();++i) {
+ for(int j = i+1;j < metrics.size();++j) {
+ Pair p = new Pair(metrics.get(i), metrics.get(j));
+ long ms = latencyMap.get(metrics.get(j)) - latencyMap.get(metrics.get(i));
+ stats.put(j-i, p, ms);
+ }
+ }
+ }
+
+
+
+ public static void main(String... argv) throws IOException {
+ Options options = new Options();
+ {
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("m", "mean_only", false, "Print the mean only when we summarize");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, argv);
+ }
+ catch(ParseException pe) {
+ pe.printStackTrace();
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+ System.exit(-1);
+ }
+ if( cmd.hasOption("h") ){
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp(LatencySummarizer.class.getSimpleName().toLowerCase(), null, options, null, true);
+ System.exit(0);
+ }
+ LatencyStats statsMap = new LatencyStats();
+ BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
+ for(String line = null;(line = reader.readLine()) != null;) {
+ Map<String, Object> doc = JSONUtils.INSTANCE.load(line, new TypeReference<HashMap<String, Object>>() {});
+ updateStats(statsMap, doc);
+ }
+ statsMap.printSummary(cmd.hasOption('m'));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
index 13ccd0c..127ca1f 100644
--- a/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
+++ b/metron-streaming/Metron-Topologies/src/main/java/org/apache/metron/utils/SourceConfigUtils.java
@@ -83,31 +83,52 @@ public class SourceConfigUtils {
public static void main(String[] args) {
- Options options = new Options();
- options.addOption("p", true, "Path to source option files");
- options.addOption("z", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+ Options options = new Options();
+ {
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("p", "config_files", true, "Path to the source config files. Must be named like \"$source\"-config.json");
+ o.setArgName("DIR_NAME");
+ o.setRequired(false);
+ options.addOption(o);
+ }
+ {
+ Option o = new Option("z", "zk", true, "Zookeeper Quroum URL (zk1:2181,zk2:2181,...");
+ o.setArgName("ZK_QUORUM");
+ o.setRequired(true);
+ options.addOption(o);
+ }
try {
- CommandLineParser parser = new BasicParser();
- CommandLine cmd = parser.parse( options, args);
-
- if( !cmd.hasOption('p') || !cmd.hasOption('z') ){
- final PrintWriter writer = new PrintWriter(System.out);
+ CommandLineParser parser = new PosixParser();
+ CommandLine cmd = null;
+ try {
+ cmd = parser.parse(options, args);
+ }
+ catch(ParseException pe) {
+ pe.printStackTrace();
final HelpFormatter usageFormatter = new HelpFormatter();
- usageFormatter.printUsage(writer, 80, "Apache Metron SourceConfigUtils", options);
- writer.close();
- System.exit(1);
+ usageFormatter.printHelp("SourceConfigUtils", null, options, null, true);
+ System.exit(-1);
+ }
+ if( cmd.hasOption("h") ){
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("SourceConfigUtils", null, options, null, true);
+ System.exit(0);
}
- String sourcePath = cmd.getOptionValue('p');
- String zkQuorum = cmd.getOptionValue('z');
-
-
- File root = new File(sourcePath);
+ String zkQuorum = cmd.getOptionValue("z");
+ if(cmd.hasOption("p")) {
+ String sourcePath = cmd.getOptionValue("p");
+ File root = new File(sourcePath);
- if( root.isDirectory() ) {
- for (File child : root.listFiles()) {
- writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+ if (root.isDirectory()) {
+ for (File child : root.listFiles()) {
+ writeToZookeeperFromFile(child.getName().replaceFirst("-config.json", ""), child.getPath(), zkQuorum);
+ }
}
}
@@ -115,6 +136,8 @@ public class SourceConfigUtils {
} catch (Exception e) {
e.printStackTrace();
+ System.exit(-1);
}
+
}
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
index 5d2786d..1f61609 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/etc/env/config.properties
@@ -86,6 +86,7 @@ bolt.hdfs.file.system.url=hdfs://iot01.cloud.hortonworks.com:8020
bolt.hdfs.wip.file.path=/paloalto/wip
bolt.hdfs.finished.file.path=/paloalto/rotated
bolt.hdfs.compression.codec.class=org.apache.hadoop.io.compress.SnappyCodec
+index.hdfs.output=/tmp/metron/enriched
##### HBase #####
bolt.hbase.table.name=pcap_test
@@ -103,4 +104,3 @@ threat.intel.tracker.table=
threat.intel.tracker.cf=
threat.intel.ip.table=
threat.intel.ip.cf=
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
index ec36f2c..45d8a50 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/remote.yaml
@@ -101,7 +101,25 @@ components:
args:
- ref: "ipThreatIntelEnrichment"
+ - id: "fileNameFormat"
+ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+ configMethods:
+ - name: "withPrefix"
+ args:
+ - "enrichment-"
+ - name: "withExtension"
+ args:
+ - ".json"
+ - name: "withPath"
+ args:
+ - "${index.hdfs.output}"
#indexing
+ - id: "hdfsWriter"
+ className: "org.apache.metron.writer.hdfs.HdfsWriter"
+ configMethods:
+ - name: "withFileNameFormat"
+ args:
+ - ref: "fileNameFormat"
- id: "indexWriter"
className: "org.apache.metron.writer.ElasticsearchWriter"
constructorArgs:
@@ -228,6 +246,14 @@ bolts:
- name: "withBulkMessageWriter"
args:
- ref: "indexWriter"
+ - id: "hdfsIndexingBolt"
+ className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withBulkMessageWriter"
+ args:
+ - ref: "hdfsWriter"
streams:
@@ -314,6 +340,13 @@ streams:
streamId: "message"
type: FIELDS
args: ["key"]
+ - name: "threatIntelJoin -> hdfs"
+ from: "threatIntelJoinBolt"
+ to: "hdfsIndexingBolt"
+ grouping:
+ streamId: "message"
+ type: SHUFFLE
+
- name: "indexingBolt -> errorIndexingBolt"
from: "indexingBolt"
to: "indexingBolt"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
index 0e530f5..e1e0ad1 100644
--- a/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
+++ b/metron-streaming/Metron-Topologies/src/main/resources/Metron_Configs/topologies/enrichment/test.yaml
@@ -83,7 +83,25 @@ components:
args:
- ref: "ipThreatIntelEnrichment"
+ - id: "fileNameFormat"
+ className: "org.apache.storm.hdfs.bolt.format.DefaultFileNameFormat"
+ configMethods:
+ - name: "withPrefix"
+ args:
+ - "enrichment-"
+ - name: "withExtension"
+ args:
+ - ".json"
+ - name: "withPath"
+ args:
+ - "${index.hdfs.output}"
#indexing
+ - id: "hdfsWriter"
+ className: "org.apache.metron.writer.hdfs.HdfsWriter"
+ configMethods:
+ - name: "withFileNameFormat"
+ args:
+ - ref: "fileNameFormat"
- id: "indexWriter"
className: "org.apache.metron.writer.ElasticsearchWriter"
constructorArgs:
@@ -220,6 +238,14 @@ bolts:
- name: "withBulkMessageWriter"
args:
- ref: "indexWriter"
+ - id: "hdfsIndexingBolt"
+ className: "org.apache.metron.bolt.BulkMessageWriterBolt"
+ constructorArgs:
+ - "${kafka.zk}"
+ configMethods:
+ - name: "withBulkMessageWriter"
+ args:
+ - ref: "hdfsWriter"
streams:
@@ -306,6 +332,14 @@ streams:
streamId: "message"
type: FIELDS
args: ["key"]
+
+ - name: "threatIntelJoin -> hdfs"
+ from: "threatIntelJoinBolt"
+ to: "hdfsIndexingBolt"
+ grouping:
+ streamId: "message"
+ type: SHUFFLE
+
- name: "indexingBolt -> errorIndexingBolt"
from: "indexingBolt"
to: "indexingBolt"
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/d043f245/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 6e62e84..4d5f897 100644
--- a/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Topologies/src/test/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -17,8 +17,10 @@
*/
package org.apache.metron.integration;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.*;
import com.google.common.collect.Iterables;
+import com.google.common.io.Files;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.metron.Constants;
@@ -43,10 +45,7 @@ import org.junit.Test;
import org.apache.metron.utils.JSONUtils;
import javax.annotation.Nullable;
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.Serializable;
+import java.io.*;
import java.text.SimpleDateFormat;
import java.util.*;
@@ -54,6 +53,7 @@ public class EnrichmentIntegrationTest {
private String fluxPath = "src/main/resources/Metron_Configs/topologies/enrichment/test.yaml";
private String indexDir = "target/elasticsearch";
+ private String hdfsDir = "target/enrichmentIntegrationTest/hdfs";
private String sampleParsedPath = "src/main/resources/SampleParsed/YafExampleParsed";
private String sampleIndexedPath = "src/main/resources/SampleIndexed/YafIndexed";
private Map<String, String> sourceConfigs = new HashMap<>();
@@ -66,9 +66,68 @@ public class EnrichmentIntegrationTest {
}
}
+ public static void cleanHdfsDir(String hdfsDirStr) {
+ File hdfsDir = new File(hdfsDirStr);
+ Stack<File> fs = new Stack<>();
+ if(hdfsDir.exists()) {
+ fs.push(hdfsDir);
+ while(!fs.empty()) {
+ File f = fs.pop();
+ if (f.isDirectory()) {
+ for(File child : f.listFiles()) {
+ fs.push(child);
+ }
+ }
+ else {
+ if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+ f.delete();
+ }
+ }
+ }
+ }
+ }
+
+ public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException {
+ List<Map<String, Object>> ret = new ArrayList<>();
+ File hdfsDir = new File(hdfsDirStr);
+ Stack<File> fs = new Stack<>();
+ if(hdfsDir.exists()) {
+ fs.push(hdfsDir);
+ while(!fs.empty()) {
+ File f = fs.pop();
+ if(f.isDirectory()) {
+ for (File child : f.listFiles()) {
+ fs.push(child);
+ }
+ }
+ else {
+ System.out.println("Processed " + f);
+ if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) {
+ List<byte[]> data = TestUtils.readSampleData(f.getPath());
+ Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() {
+ @Nullable
+ @Override
+ public Map<String, Object> apply(@Nullable byte[] bytes) {
+ String s = new String(bytes);
+ try {
+ return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() {
+ });
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }));
+ }
+ }
+ }
+ }
+ return ret;
+ }
+
@Test
public void test() throws Exception {
+ cleanHdfsDir(hdfsDir);
final String dateFormat = "yyyy.MM.dd.hh";
final String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
String yafConfig = "{\n" +
@@ -103,6 +162,7 @@ public class EnrichmentIntegrationTest {
setProperty("es.port", "9300");
setProperty("es.ip", "localhost");
setProperty("index.date.format", dateFormat);
+ setProperty("index.hdfs.output", hdfsDir);
}};
final KafkaWithZKComponent kafkaComponent = new KafkaWithZKComponent().withTopics(new ArrayList<KafkaWithZKComponent.Topic>() {{
add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1));
@@ -147,52 +207,69 @@ public class EnrichmentIntegrationTest {
.withComponent("elasticsearch", esComponent)
.withComponent("storm", fluxComponent)
.withMillisecondsBetweenAttempts(10000)
- .withNumRetries(30)
- .withMaxTimeMS(300000)
+ .withNumRetries(10)
.build();
runner.start();
- fluxComponent.submitTopology();
- kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
- List<Map<String, Object>> docs =
- runner.process(new Processor<List<Map<String, Object>>> () {
- List<Map<String, Object>> docs = null;
- public ReadinessState process(ComponentRunner runner){
- ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
- if(elasticSearchComponent.hasIndex(index)) {
- try {
- docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
- } catch (IOException e) {
- throw new IllegalStateException("Unable to retrieve indexed documents.", e);
- }
- if(docs.size() < inputMessages.size()) {
+ try {
+ fluxComponent.submitTopology();
+ kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages);
+ List<Map<String, Object>> docs =
+ runner.process(new Processor<List<Map<String, Object>>>() {
+ List<Map<String, Object>> docs = null;
+
+ public ReadinessState process(ComponentRunner runner) {
+ ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
+ if (elasticSearchComponent.hasIndex(index)) {
+ List<Map<String, Object>> docsFromDisk;
+ try {
+ docs = elasticSearchComponent.getAllIndexedDocs(index, "yaf_doc");
+ docsFromDisk = readDocsFromDisk(hdfsDir);
+ System.out.println(docs.size() + " vs " + inputMessages.size() + " vs " + docsFromDisk.size());
+ } catch (IOException e) {
+ throw new IllegalStateException("Unable to retrieve indexed documents.", e);
+ }
+ if (docs.size() < inputMessages.size() || docs.size() != docsFromDisk.size()) {
+ return ReadinessState.NOT_READY;
+ } else {
+ return ReadinessState.READY;
+ }
+ } else {
return ReadinessState.NOT_READY;
}
- else {
- return ReadinessState.READY;
- }
}
- else {
- return ReadinessState.NOT_READY;
+
+ public List<Map<String, Object>> getResult() {
+ return docs;
}
- }
+ });
- public List<Map<String, Object>> getResult() {
- return docs;
- }
- });
- List<byte[]> sampleIndexedMessages = TestUtils.readSampleData(sampleIndexedPath);
- Assert.assertEquals(sampleIndexedMessages.size(), docs.size());
+ Assert.assertEquals(inputMessages.size(), docs.size());
+
+ for (Map<String, Object> doc : docs) {
+ baseValidation(doc);
+ hostEnrichmentValidation(doc);
+ geoEnrichmentValidation(doc);
+ threatIntelValidation(doc);
- for (Map<String, Object> doc : docs) {
- baseValidation(doc);
+ }
+ List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir);
+ Assert.assertEquals(docsFromDisk.size(), docs.size()) ;
- hostEnrichmentValidation(doc);
- geoEnrichmentValidation(doc);
- threatIntelValidation(doc);
+ Assert.assertEquals(new File(hdfsDir).list().length, 1);
+ Assert.assertEquals(new File(hdfsDir).list()[0], "yaf_doc");
+ for (Map<String, Object> doc : docsFromDisk) {
+ baseValidation(doc);
+ hostEnrichmentValidation(doc);
+ geoEnrichmentValidation(doc);
+ threatIntelValidation(doc);
+ }
+ }
+ finally {
+ cleanHdfsDir(hdfsDir);
+ runner.stop();
}
- runner.stop();
}
public static void baseValidation(Map<String, Object> jsonDoc) {