You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ly...@apache.org on 2016/04/19 18:18:22 UTC
incubator-metron git commit: METRON-96: Create data purging script
for ES Index closes apache/incubator-metron#79
Repository: incubator-metron
Updated Branches:
refs/heads/master 25e732661 -> 4931f2257
METRON-96: Create data purging script for ES Index closes apache/incubator-metron#79
Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/4931f225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/4931f225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/4931f225
Branch: refs/heads/master
Commit: 4931f22571ce98ed2b5a068052ad71d0a2348f40
Parents: 25e7326
Author: David Lyle <dl...@gmail.com>
Authored: Tue Apr 19 12:17:56 2016 -0400
Committer: David Lyle <dl...@gmail.com>
Committed: Tue Apr 19 12:17:56 2016 -0400
----------------------------------------------------------------------
.../roles/metron_streaming/defaults/main.yml | 10 +-
.../roles/metron_streaming/tasks/es_purge.yml | 42 ++++
.../roles/metron_streaming/tasks/hdfs_purge.yml | 12 +-
.../roles/metron_streaming/tasks/main.yml | 1 +
.../templates/config/elasticsearch.global.json | 6 +-
.../org/apache/metron/domain/Configuration.java | 60 ++++++
.../apache/metron/domain/ConfigurationTest.java | 90 ++++++++
.../src/test/resources/config/global.json | 3 +
.../src/test/resources/config/sensors/bro.json | 19 ++
metron-streaming/Metron-DataLoads/pom.xml | 116 +++++-----
.../main/bash/prune_elasticsearch_indices.sh | 21 ++
.../metron/dataloads/bulk/DataPruner.java | 66 ++++++
.../dataloads/bulk/ElasticsearchDataPruner.java | 135 ++++++++++++
.../bulk/ElasticsearchDataPrunerRunner.java | 191 +++++++++++++++++
.../metron/dataloads/bulk/HDFSDataPruner.java | 44 +---
.../dataloads/bulk/StartDateException.java | 31 +++
.../ElasticsearchDataPrunerIntegrationTest.java | 156 ++++++++++++++
.../bulk/ElasticsearchDataPrunerRunnerTest.java | 72 +++++++
.../bulk/ElasticsearchDataPrunerTest.java | 210 +++++++++++++++++++
.../dataloads/bulk/HDFSDataPrunerTest.java | 18 +-
.../src/test/resources/log4j2.xml | 31 ---
.../src/test/resources/logging.properties | 17 --
.../metron/writer/ElasticsearchWriter.java | 2 +-
.../ElasticsearchEnrichmentIntegrationTest.java | 2 +-
.../integration/EnrichmentIntegrationTest.java | 2 +-
.../java/org/apache/metron/util/SampleUtil.java | 3 +-
.../main/resources/sample/config/global.json | 2 +-
metron-streaming/pom.xml | 150 +++++++++----
28 files changed, 1315 insertions(+), 197 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/defaults/main.yml b/deployment/roles/metron_streaming/defaults/main.yml
index 0eff59f..f0f605d 100644
--- a/deployment/roles/metron_streaming/defaults/main.yml
+++ b/deployment/roles/metron_streaming/defaults/main.yml
@@ -56,13 +56,19 @@ storm_topologies:
- "{{ metron_directory }}/config/topologies/enrichment/remote.yaml"
hdfs_retention_days: 30
-hdfs_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-bro-purge.log 2>&1"
-hdfs_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/yaf_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-yaf-purge.log 2>&1"
+hdfs_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-hdfs-bro-purge.log 2>&1"
+hdfs_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/yaf_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-hdfs-yaf-purge.log 2>&1"
+hdfs_snort_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/snort_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-hdfs-yaf-purge.log 2>&1"
elasticsearch_config_path: /etc/elasticsearch
elasticsearch_cluster_name: metron
elasticsearch_transport_port: 9300
+es_retention_days: 30
+es_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p bro_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-es-bro-purge.log 2>&1"
+es_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p yaf_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-es-yaf-purge.log 2>&1"
+es_snort_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p yaf_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/snort-purge/cron-es-snort-purge.log 2>&1"
+
metron_hdfs_output_dir: "/apps/metron"
metron_hdfs_rotation_policy: org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy
metron_hdfs_rotation_policy_count: 1
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/es_purge.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/es_purge.yml b/deployment/roles/metron_streaming/tasks/es_purge.yml
new file mode 100644
index 0000000..22616ca
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/es_purge.yml
@@ -0,0 +1,42 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+---
+- name: Create Empty Log Files for ES Purge
+ file:
+ path: "{{ item }}"
+ state: touch
+ owner: hdfs
+ group: hdfs
+ mode: 0644
+ with_items:
+ - /var/log/bro-purge/cron-es-bro-purge.log
+ - /var/log/yaf-purge/cron-es-yaf-purge.log
+ - /var/log/snort-purge/cron-es-snort-purge.log
+
+
+- name: Purge Elasticsearch Indices every 30 days.
+ cron:
+ name: "{{ item.name }}"
+ job: "{{ item.job }}"
+ special_time: daily
+ user: hdfs
+ with_items:
+ - { name: "bro_es_purge", job: "{{ es_bro_purge_cronjob }}" }
+ - { name: "yaf_es_purge", job: "{{ es_yaf_purge_cronjob }}" }
+ - { name: "snort_es_purge", job: "{{ es_snort_purge_cronjob }}" }
+
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/hdfs_purge.yml b/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
index 1f63451..33442e4 100644
--- a/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
+++ b/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
@@ -25,6 +25,7 @@
with_items:
- /var/log/bro-purge
- /var/log/yaf-purge
+ - /var/log/snort-purge
- name: Create Empty Log Files for HDFS Purge
file:
@@ -34,9 +35,9 @@
group: hdfs
mode: 0644
with_items:
- - /var/log/bro-purge/cron-bro-purge.log
- - /var/log/yaf-purge/cron-yaf-purge.log
-
+ - /var/log/bro-purge/cron-hdfs-bro-purge.log
+ - /var/log/yaf-purge/cron-hdfs-yaf-purge.log
+ - /var/log/snort-purge/cron-hdfs-snort-purge.log
- name: Purge HDFS Sensor Data every 30 days.
cron:
@@ -45,6 +46,7 @@
special_time: daily
user: hdfs
with_items:
- - { name: "bro_purge", job: "{{ hdfs_bro_purge_cronjob }}" }
- - { name: "yaf_purge", job: "{{ hdfs_yaf_purge_cronjob }}" }
+ - { name: "bro_hdfs_purge", job: "{{ hdfs_bro_purge_cronjob }}" }
+ - { name: "yaf_hdfs_purge", job: "{{ hdfs_yaf_purge_cronjob }}" }
+ - { name: "snort_hdfs_purge", job: "{{ hdfs_snort_purge_cronjob }}" }
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index 152af56..e076645 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -133,3 +133,4 @@
- include: hdfs_purge.yml
+- include: es_purge.yml
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json b/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
index aa1076c..8177102 100644
--- a/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
+++ b/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
@@ -1,6 +1,6 @@
{
"es.clustername": "{{ elasticsearch_cluster_name }}",
"es.ip": "{{ groups.search[0] }}",
- "es.port": {{ elasticsearch_transport_port }},
- "es.date.format": "yyyy.MM.dd.hh"
-}
\ No newline at end of file
+ "es.port": "{{ elasticsearch_transport_port }}",
+ "es.date.format": "yyyy.MM.dd.HH"
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
new file mode 100644
index 0000000..d21c686
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.utils.ConfigurationsUtils;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public class Configuration extends Configurations {
+
+ protected CuratorFramework curatorFramework = null;
+ private Path configFileRoot;
+
+ public Configuration(CuratorFramework curatorFramework){
+
+ this.curatorFramework = curatorFramework;
+
+ }
+
+
+ public Configuration(Path configFileRoot){
+
+ this.configFileRoot = configFileRoot;
+ }
+
+ public void update() throws Exception {
+
+ if( null != curatorFramework ) {
+
+ ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
+
+ } else {
+
+ updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
+ Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
+ for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+ updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+ }
+
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
new file mode 100644
index 0000000..74200b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfigurationTest {
+
+ private static final String TEST_PROPERTY = "configuration.class.test.property";
+ private static final String TEST_VALUE = "Configuration";
+ @Test
+ public void testCanReadFromFile() throws Exception {
+
+ Configuration configuration = new Configuration(Paths.get("./src/test/resources/config/"));
+ configuration.update();
+
+ checkResult(configuration);
+
+ }
+
+ @Test
+ public void testCanReadFromZookeeper() throws Exception {
+
+ CuratorFramework curatorFramework = mock(CuratorFramework.class);
+ ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
+ GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+ GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
+
+ when(getDataBuilder.forPath(Constants.ZOOKEEPER_GLOBAL_ROOT)).thenReturn(mockGlobalData());
+ when(curatorFramework.checkExists()).thenReturn(existsBuilder);
+ when(curatorFramework.getData()).thenReturn(getDataBuilder);
+ when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+ when(getChildrenBuilder.forPath(anyString())).thenReturn(Collections.<String> emptyList());
+
+ Configuration configuration = new Configuration(Paths.get("foo"));
+ configuration.curatorFramework = curatorFramework;
+ configuration.update();
+
+ checkResult(configuration);
+ }
+
+
+ private byte[] mockGlobalData(){
+
+ JSONObject global = new JSONObject();
+ global.put(TEST_PROPERTY, TEST_VALUE);
+ return global.toString().getBytes();
+
+ }
+
+
+ private void checkResult( Configuration configuration ){
+
+ assertEquals("File contains 1 entry: ",1,configuration.getGlobalConfig().size());
+ String testValue = configuration.getGlobalConfig().get(TEST_PROPERTY).toString();
+ assertEquals(TEST_PROPERTY + " should be \"" + TEST_VALUE + "\"",TEST_VALUE,testValue);
+
+
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/resources/config/global.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/global.json b/metron-streaming/Metron-Common/src/test/resources/config/global.json
new file mode 100644
index 0000000..44ce6b1
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/resources/config/global.json
@@ -0,0 +1,3 @@
+{
+ "configuration.class.test.property": "Configuration"
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json b/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
new file mode 100644
index 0000000..8886495
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
@@ -0,0 +1,19 @@
+{
+ "index": "bro",
+ "batchSize": 5,
+ "enrichmentFieldMap":
+ {
+ "geo": ["ip_dst_addr", "ip_src_addr"],
+ "host": ["host"]
+ },
+ "threatIntelFieldMap":
+ {
+ "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
+ },
+ "fieldToThreatIntelTypeMap":
+ {
+ "ip_dst_addr" : [ "malicious_ip" ]
+ ,"ip_src_addr" : [ "malicious_ip" ]
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index 53c39da..2d49995 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -120,6 +120,41 @@
<version>${httpcore.version}</version>
</dependency>
<dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.carrotsearch.randomizedtesting</groupId>
+ <artifactId>randomizedtesting-runner</artifactId>
+ <version>2.1.14</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.elasticsearch</groupId>
+ <artifactId>elasticsearch</artifactId>
+ <version>${global_elasticsearch_version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-test-framework</artifactId>
+ <version>4.10.4</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.lucene</groupId>
+ <artifactId>lucene-core</artifactId>
+ <version>4.10.4</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-testing-util</artifactId>
<version>${global_hbase_version}</version>
@@ -138,21 +173,44 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>log4j-over-slf4j</artifactId>
- <version>${global_slf4j_version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>Metron-Testing</artifactId>
<version>${project.parent.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
- <version>${global_mockito_version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-module-junit4</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-mockito</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.powermock</groupId>
+ <artifactId>powermock-api-easymock</artifactId>
+ <version>1.6.2</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.easymock</groupId>
+ <artifactId>easymock</artifactId>
+ <version>3.4</version>
<scope>test</scope>
</dependency>
</dependencies>
@@ -167,56 +225,14 @@
</resources>
<plugins>
<plugin>
- <!-- Separates the unit tests from the integration tests. -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
- <version>2.12.4</version>
<configuration>
- <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
- <argLine>-Xmx2048m -XX:MaxPermSize=256m -Djava.util.logging.config.file=logging.properties</argLine>
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m -XX:-UseSplitVerifier</argLine>
<skip>true</skip>
- <!-- Show 100% of the lines from the stack trace (doesn't work) -->
<trimStackTrace>false</trimStackTrace>
-
</configuration>
- <executions>
- <execution>
- <id>unit-tests</id>
- <phase>test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <!-- Never skip running the tests when the test phase is invoked -->
- <skip>false</skip>
- <includes>
- <!-- Include unit tests within integration-test phase. -->
- <include>**/*Test.java</include>
- </includes>
- <excludes>
- <!-- Exclude integration tests within (unit) test phase. -->
- <exclude>**/*IntegrationTest.java</exclude>
- </excludes>
- </configuration>
- </execution>
- <execution>
- <id>integration-tests</id>
- <phase>integration-test</phase>
- <goals>
- <goal>test</goal>
- </goals>
- <configuration>
- <!-- Never skip running the tests when the integration-test phase is invoked -->
- <skip>false</skip>
- <includes>
- <!-- Include integration tests within integration-test phase. -->
- <include>**/*IntegrationTest.java</include>
- </includes>
- </configuration>
- </execution>
- </executions>
</plugin>
-
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh b/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
new file mode 100644
index 0000000..fc69858
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/Metron-DataLoads-${project.version}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner "$@"
+
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
new file mode 100644
index 0000000..98e3b52
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DataPruner {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(DataPruner.class);
+ protected long firstTimeMillis;
+ protected long lastTimeMillis;
+ protected String wildCard;
+
+ public DataPruner(Date startDate, Integer numDays, String wildCard) throws StartDateException {
+
+ Date startAtMidnight = dateAtMidnight(startDate);
+ this.lastTimeMillis = startDate.getTime();
+ this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
+ this.wildCard = wildCard;
+
+ Date today = dateAtMidnight(new Date());
+
+ if (!today.after(startAtMidnight)) {
+ throw new StartDateException("Prune Start Date must be prior to today");
+ }
+ }
+
+ protected Date dateAtMidnight(Date date) {
+
+ Calendar calendar = Calendar.getInstance();
+
+ calendar.setTime(date);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND, 0);
+ return calendar.getTime();
+
+ }
+
+
+ public abstract Long prune() throws IOException;
+
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
new file mode 100644
index 0000000..ebd0878
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.domain.Configuration;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+
+public class ElasticsearchDataPruner extends DataPruner {
+
+ private String indexPattern;
+ private SimpleDateFormat dateFormat;
+ protected Client indexClient = null;
+ protected Configuration configuration;
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+ private static final String defaultDateFormat = "yyyy.MM.dd.HH";
+
+
+
+ private Predicate<String> filterWithRegex = new Predicate<String>() {
+
+ @Override
+ public boolean apply(String str) {
+
+ try {
+ String dateString = str.substring(indexPattern.length());
+ Date indexCreateDate = dateFormat.parse(dateString);
+ long indexCreatedDate = indexCreateDate.getTime();
+ if (indexCreatedDate >= firstTimeMillis && indexCreatedDate < lastTimeMillis) {
+ return true;
+ }
+ } catch (ParseException e) {
+ LOG.error("Unable to parse date from + " + str.substring(indexPattern.length()), e);
+ }
+
+ return false;
+ }
+
+ };
+
+ ElasticsearchDataPruner(Date startDate, Integer numDays,Configuration configuration, Client indexClient, String indexPattern) throws Exception {
+
+ super(startDate, numDays, indexPattern);
+
+ this.indexPattern = indexPattern;
+ this.dateFormat = new SimpleDateFormat(defaultDateFormat);
+ this.configuration = configuration;
+ this.indexClient = indexClient;
+
+
+ }
+
+ @Override
+ public Long prune() throws IOException {
+
+ try {
+
+ configuration.update();
+
+ }
+ catch(Exception e) {
+
+ LOG.error("Unable to update configs",e);
+
+ }
+
+ String dateString = configuration.getGlobalConfig().get("es.date.format").toString();
+
+ if( null != dateString ){
+ dateFormat = new SimpleDateFormat(dateString);
+ }
+
+ ImmutableOpenMap<String, IndexMetaData> allIndices = indexClient.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
+ Iterable indicesForDeletion = getFilteredIndices(allIndices);
+ Object[] indexArray = IteratorUtils.toArray(indicesForDeletion.iterator());
+
+ if(indexArray.length > 0) {
+ String[] indexStringArray = new String[indexArray.length];
+ System.arraycopy(indexArray, 0, indexStringArray, 0, indexArray.length);
+ deleteIndex(indexClient.admin(), indexStringArray);
+ }
+
+ return new Long(indexArray.length);
+
+ }
+
+ protected Boolean deleteIndex(AdminClient adminClient, String... index) {
+
+ boolean isAcknowledged = adminClient.indices().delete(adminClient.indices().prepareDelete(index).request()).actionGet().isAcknowledged();
+ return new Boolean(isAcknowledged);
+
+ }
+
+ protected Iterable<String> getFilteredIndices(ImmutableOpenMap<String, IndexMetaData> indices) {
+
+ String[] returnedIndices = new String[indices.size()];
+ Iterator it = indices.keysIt();
+ System.arraycopy(IteratorUtils.toArray(it), 0, returnedIndices, 0, returnedIndices.length);
+ Iterable<String> matches = Iterables.filter(Arrays.asList(returnedIndices), filterWithRegex);
+
+ return matches;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
new file mode 100644
index 0000000..46100a2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.domain.Configuration;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+public class ElasticsearchDataPrunerRunner {
+
+ private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+
+ public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+ /**
+ * Example
+ * start=$(date -d '30 days ago' +%m/%d/%Y)
+ * yarn jar Metron-DataLoads-{VERSION}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner -i host1:9300 -p '/bro_index_' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+ * echo ${start}
+ **/
+
+ Options options = buildOptions();
+ Options help = new Options();
+ TransportClient client = null;
+
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ help.addOption(o);
+
+
+
+ try {
+
+ CommandLine cmd = checkOptions(help,options, argv);
+
+ String start = cmd.getOptionValue("s");
+ Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+
+ Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+ String indexPrefix = cmd.getOptionValue("p");
+
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Running prune with args: " + startDate + " " + numDays);
+ }
+
+ Configuration configuration = null;
+
+ if( cmd.hasOption("z")){
+
+ RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+ CuratorFramework framework = CuratorFrameworkFactory.newClient(cmd.getOptionValue("z"),retryPolicy);
+ framework.start();
+ configuration = new Configuration(framework);
+
+ } else if ( cmd.hasOption("c") ){
+
+ String resourceFile = cmd.getOptionValue("c");
+ configuration = new Configuration(Paths.get(resourceFile));
+
+ }
+
+ configuration.update();
+
+ Map<String, Object> globalConfiguration = configuration.getGlobalConfig();
+ ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+ builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+ builder.put("curatorFramework.transport.ping_timeout","500s");
+ client = new TransportClient(builder.build())
+ .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
+
+ DataPruner pruner = new ElasticsearchDataPruner(startDate, numDays, configuration, client, indexPrefix);
+
+ LOG.info("Pruned " + pruner.prune() + " indices from " + globalConfiguration.get("es.ip") + ":" + globalConfiguration.get("es.port") + "/" + indexPrefix);
+
+
+ } catch (Exception e) {
+
+ e.printStackTrace();
+ System.exit(-1);
+
+ } finally {
+
+ if( null != client) {
+ client.close();
+ }
+
+ }
+
+ }
+
+ public static CommandLine checkOptions(Options help, Options options, String ... argv) throws ParseException {
+
+ CommandLine cmd = null;
+ CommandLineParser parser = new PosixParser();
+
+
+ try {
+
+ cmd = parser.parse(help,argv,true);
+
+ if( cmd.getOptions().length > 0){
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ System.exit(0);
+ }
+
+ cmd = parser.parse(options, argv);
+
+ } catch (ParseException e) {
+
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ throw e;
+
+ }
+
+
+ if( (cmd.hasOption("z") && cmd.hasOption("c")) || (!cmd.hasOption("z") && !cmd.hasOption("c")) ){
+
+ System.err.println("One (only) of zookeeper-hosts or config-location is required");
+ final HelpFormatter usageFormatter = new HelpFormatter();
+ usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+ throw new RuntimeException("Must specify zookeeper-hosts or config-location, but not both");
+
+ }
+
+ return cmd;
+ }
+
+ public static Options buildOptions(){
+
+ Options options = new Options();
+
+ Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+ o.setArgName("START_DATE");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("n", "numdays", true, "Number of days back to purge");
+ o.setArgName("NUMDAYS");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("p", "index-prefix", true, "Index prefix - e.g. bro_index_");
+ o.setArgName("PREFIX");
+ o.setRequired(true);
+ options.addOption(o);
+
+ o = new Option("c", "config-location", true, "Directory Path - e.g. /path/to/config/dir");
+ o.setArgName("CONFIG");
+ o.setRequired(false);
+ options.addOption(o);
+
+ o = new Option("z", "zookeeper-hosts", true, "Zookeeper URL - e.g. zkhost1:2181,zkhost2:2181,zkhost3:2181");
+ o.setArgName("PREFIX");
+ o.setRequired(false);
+ options.addOption(o);
+
+ return options;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
index 5fbc27e..097253c 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
@@ -30,29 +30,19 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.text.SimpleDateFormat;
-import java.util.Calendar;
import java.util.Date;
-import java.util.concurrent.TimeUnit;
-public class HDFSDataPruner {
+public class HDFSDataPruner extends DataPruner {
+
- private static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
- private Date startDate;
- private long firstTimeMillis;
- private long lastTimeMillis;
private Path globPath;
protected FileSystem fileSystem;
+ protected static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
- private HDFSDataPruner() {
- }
-
-
- HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException {
+ HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException, StartDateException {
- this.startDate = dateAtMidnight(startDate);
- this.lastTimeMillis = startDate.getTime();
- this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
- this.globPath = new Path(globPath);
+ super(startDate,numDays,globPath);
+ this.globPath = new Path(wildCard);
Configuration conf = new Configuration();
conf.set("fs.defaultFS", fsUri);
this.fileSystem = FileSystem.get(conf);
@@ -136,7 +126,7 @@ public class HDFSDataPruner {
LOG.debug("Running prune with args: " + startDate + " " + numDays + " " + fileSystemUri + " " + globString);
}
- HDFSDataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
+ DataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
LOG.info("Pruned " + pruner.prune() + " files from " + fileSystemUri + globString);
@@ -151,12 +141,6 @@ public class HDFSDataPruner {
Long filesPruned = new Long(0);
- Date today = dateAtMidnight(new Date());
-
- if (!today.after(startDate)) {
- throw new RuntimeException("Prune Start Date must be prior to today");
- }
-
FileStatus[] filesToDelete = fileSystem.globStatus(globPath, new HDFSDataPruner.DateFileFilter(this));
for (FileStatus fileStatus : filesToDelete) {
@@ -173,20 +157,6 @@ public class HDFSDataPruner {
return filesPruned;
}
- private Date dateAtMidnight(Date date) {
-
- Calendar calendar = Calendar.getInstance();
-
- calendar.setTime(date);
- calendar.set(Calendar.HOUR, 0);
- calendar.set(Calendar.MINUTE, 0);
- calendar.set(Calendar.SECOND, 0);
- calendar.set(Calendar.MILLISECOND, 0);
-
- return calendar.getTime();
-
- }
-
class DateFileFilter extends Configured implements PathFilter {
HDFSDataPruner pruner;
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
new file mode 100644
index 0000000..d3a0549
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+public class StartDateException extends Exception {
+
+ public StartDateException(String message){
+ super(message);
+ }
+
+ public StartDateException(String message, Throwable t){
+ super(message,t);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
new file mode 100644
index 0000000..2053e34
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.domain.Configuration;
+import org.easymock.EasyMock;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.IndexMissingException;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 1, numClientNodes = 0)
+public class ElasticsearchDataPrunerIntegrationTest extends ElasticsearchIntegrationTest {
+
+ private static File dataPath = new File("./target/elasticsearch-test");
+ private Date testingDate;
+ private Date yesterday = new Date();
+ private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+ private Configuration configuration;
+
+ @BeforeClass
+ public static void setupClass() throws Exception {
+
+ if (dataPath.isDirectory()) {
+ FileUtils.deleteDirectory(dataPath);
+ }
+
+ if (!dataPath.mkdirs()) {
+ throw new RuntimeException("Couldn't create dataPath at: " + dataPath.getAbsolutePath());
+ }
+
+ }
+
+ @AfterClass
+ public static void teardownClass() throws Exception {
+
+ if (dataPath.isDirectory()) {
+ FileUtils.deleteDirectory(dataPath);
+ }
+
+ }
+
+ @Before
+ public void setUp() throws Exception {
+
+ super.setUp();
+ ensureGreen();
+
+ TimeZone timeZone = TimeZone.getTimeZone("UTC");
+ Calendar calendar = Calendar.getInstance(timeZone);
+ calendar.set(Calendar.HOUR_OF_DAY,0);
+ calendar.set(Calendar.MINUTE,0);
+ calendar.set(Calendar.SECOND,0);
+ testingDate = calendar.getTime();
+ yesterday.setTime(testingDate.getTime() - TimeUnit.DAYS.toMillis(1));
+ dateFormat.setTimeZone(timeZone);
+
+ File resourceFile = new File("../Metron-Testing/src/main/resources/sample/config/");
+ Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
+
+ configuration = new Configuration(resourcePath);
+ }
+
+ @Test(expected = IndexMissingException.class)
+ public void testWillThrowOnMissingIndex() throws Exception {
+
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 30, configuration,client(), "*");
+ pruner.deleteIndex(admin(), "baz");
+
+ }
+
+ @Test
+ public void testDeletesCorrectIndexes() throws Exception {
+
+ Integer numDays = 5;
+
+ Date createStartDate = new Date();
+
+ createStartDate.setTime(yesterday.getTime() - TimeUnit.DAYS.toMillis(numDays - 1));
+
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 30, configuration,client(), "*");
+ String indexesToDelete = "sensor_index_" + new SimpleDateFormat("yyyy.MM.dd").format(createStartDate) + ".*";
+ Boolean deleted = pruner.deleteIndex(admin(), indexesToDelete);
+
+ assertTrue("Index deletion should be acknowledged", deleted);
+
+ }
+
+ @Test
+ public void testHandlesNoIndicesToDelete() throws Exception {
+
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 1, configuration, client(), "sensor_index_");
+ Long deleteCount = pruner.prune();
+ assertEquals("Should have pruned 0 indices", 0L, deleteCount.longValue());
+
+
+ }
+
+ @Override
+ protected Settings nodeSettings(int nodeOrdinal) {
+
+ return ImmutableSettings.settingsBuilder()
+ .put("node.data", true)
+ .put("gateway.type", "none")
+ .put("path.data", dataPath.getPath() + "/data")
+ .put("path.work", dataPath.getPath() + "/work")
+ .put("path.logs", dataPath.getPath() + "/logs")
+ .put("cluster.routing.schedule", "50ms")
+ .put("node.local", true).build();
+
+ }
+
+ public Settings indexSettings() {
+
+ return ImmutableSettings.settingsBuilder()
+ .put("index.store.type", "memory")
+ .put("index.store.fs.memory.enabled", "true")
+ .put("index.number_of_shards", 1)
+ .put("index.number_of_replicas", 0).build();
+
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
new file mode 100644
index 0000000..5f32bee
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+public class ElasticsearchDataPrunerRunnerTest {
+
+ private Options options;
+ private Options help;
+
+ private ByteArrayOutputStream outContent;
+ private ByteArrayOutputStream errContent;
+
+ @Before
+ public void setUp(){
+
+ options = ElasticsearchDataPrunerRunner.buildOptions();
+ help = new Options();
+
+ Option o = new Option("h", "help", false, "This screen");
+ o.setRequired(false);
+ help.addOption(o);
+
+ outContent = new ByteArrayOutputStream();
+ errContent = new ByteArrayOutputStream();
+
+ System.setOut(new PrintStream(outContent));
+ System.setErr(new PrintStream(errContent));
+
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testThrowsWithoutZookeeperOrConfigLocation() throws Exception {
+
+ String[] args = new String[]{"-n","30","-p","sensor_index","-s","03/30/2016"};
+ ElasticsearchDataPrunerRunner.checkOptions(help,options,args);
+
+ }
+
+ @Test(expected = RuntimeException.class)
+ public void testThrowsWithZookeeperAndConfiguration() throws Exception {
+
+ String[] args = new String[]{"-n","30","-p","sensor_index","-s","03/30/2016"};
+ ElasticsearchDataPrunerRunner.checkOptions(help,options,args);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
new file mode 100644
index 0000000..33d3e11
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.domain.Configuration;
+import org.easymock.EasyMock;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.client.*;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.hppc.ObjectObjectOpenHashMap;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.indices.IndexMissingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DeleteIndexResponse.class)
+public class ElasticsearchDataPrunerTest {
+
+ private Date testDate;
+ private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+ private Configuration configuration;
+
+ private Client indexClient = mock(Client.class);
+ private AdminClient adminClient = mock(AdminClient.class);
+ private IndicesAdminClient indicesAdminClient = mock(FilterClient.IndicesAdmin.class);
+ private DeleteIndexRequestBuilder deleteIndexRequestBuilder = mock(DeleteIndexRequestBuilder.class);
+ private DeleteIndexRequest deleteIndexRequest = mock(DeleteIndexRequest.class);
+ private ActionFuture<DeleteIndexResponse> deleteIndexAction = mock(ActionFuture.class);
+ private DeleteIndexResponse deleteIndexResponse = PowerMock.createMock(DeleteIndexResponse.class);
+
+
+ private ByteArrayOutputStream outContent;
+ private ByteArrayOutputStream errContent;
+
+ @Before
+ public void setUp() throws Exception {
+
+ Calendar calendar = Calendar.getInstance();
+ calendar.set(Calendar.MONTH, Calendar.MARCH);
+ calendar.set(Calendar.YEAR, 2016);
+ calendar.set(Calendar.DATE, 31);
+ calendar.set(Calendar.HOUR_OF_DAY, 0);
+ calendar.set(Calendar.MINUTE, 0);
+ calendar.set(Calendar.SECOND, 0);
+ calendar.set(Calendar.MILLISECOND,0);
+ testDate = calendar.getTime();
+
+ when(indexClient.admin()).thenReturn(adminClient);
+ when(adminClient.indices()).thenReturn(indicesAdminClient);
+ when(indicesAdminClient.prepareDelete(Matchers.<String>anyVararg())).thenReturn(deleteIndexRequestBuilder);
+ when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenReturn(deleteIndexAction);
+ when(deleteIndexRequestBuilder.request()).thenReturn(deleteIndexRequest);
+ when(deleteIndexAction.actionGet()).thenReturn(deleteIndexResponse);
+
+ File resourceFile = new File("../Metron-Testing/src/main/resources/sample/config/");
+ Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
+
+ configuration = new Configuration(resourcePath);
+
+ outContent = new ByteArrayOutputStream();
+ errContent = new ByteArrayOutputStream();
+
+ System.setOut(new PrintStream(outContent));
+ System.setErr(new PrintStream(errContent));
+
+ }
+
+ @Test(expected = IndexMissingException.class)
+ public void testWillThrowOnMissingIndex() throws Exception {
+
+ when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenThrow(new IndexMissingException(new Index("Test Exception")));
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 30, configuration, indexClient,"*");
+ pruner.deleteIndex(adminClient, "baz");
+
+ }
+
+ @Test
+ public void testDeletesCorrectIndexes() throws Exception {
+
+ //Mock Cluster Admin
+ ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
+ ClusterStateRequestBuilder clusterStateRequestBuilder = mock(ClusterStateRequestBuilder.class);
+ ClusterStateResponse clusterStateResponse = mock(ClusterStateResponse.class);
+ ClusterState clusterState = mock(ClusterState.class);
+ ObjectObjectOpenHashMap<String, IndexMetaData> clusterIndexes = new ObjectObjectOpenHashMap();
+ MetaData clusterMetadata = mock(MetaData.class);
+ when(adminClient.cluster()).thenReturn(clusterAdminClient);
+ when(clusterAdminClient.prepareState()).thenReturn(clusterStateRequestBuilder);
+ when(clusterStateRequestBuilder.get()).thenReturn(clusterStateResponse);
+ when(clusterStateResponse.getState()).thenReturn(clusterState);
+ when(clusterState.getMetaData()).thenReturn(clusterMetadata);
+
+ int numDays = 5;
+
+ Date indexDate = new Date();
+
+ indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
+
+ for (int i = 0; i < numDays * 24; i++) {
+
+ String indexName = "sensor_index_" + dateFormat.format(indexDate);
+ clusterIndexes.put(indexName, null);
+ indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
+
+ }
+
+ when(clusterMetadata.getIndices()).thenReturn(ImmutableOpenMap.copyOf(clusterIndexes));
+
+
+ EasyMock.expect(deleteIndexResponse.isAcknowledged()).andReturn(true);
+
+ replayAll();
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
+ pruner.indexClient = indexClient;
+ Long deleteCount = pruner.prune();
+ assertEquals("Should have pruned 24 indices", 24L, deleteCount.longValue());
+ verifyAll();
+
+ }
+
+ @Test
+ public void testFilter() throws Exception {
+
+ ObjectObjectOpenHashMap<String, IndexMetaData> indexNames = new ObjectObjectOpenHashMap();
+ SimpleDateFormat dateChecker = new SimpleDateFormat("yyyyMMdd");
+ int numDays = 5;
+ String[] expectedIndices = new String[24];
+ Date indexDate = new Date();
+
+ indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
+
+ for (int i = 0, j=0; i < numDays * 24; i++) {
+
+ String indexName = "sensor_index_" + dateFormat.format(indexDate);
+ //Delete 20160330
+ if( dateChecker.format(indexDate).equals("20160330") ){
+ expectedIndices[j++] = indexName;
+ }
+
+ indexNames.put(indexName, null);
+ indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
+
+ }
+
+ ImmutableOpenMap<String, IndexMetaData> testIndices = ImmutableOpenMap.copyOf(indexNames);
+
+ ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
+ pruner.indexClient = indexClient;
+
+ Iterable<String> filteredIndices = pruner.getFilteredIndices(testIndices);
+
+ Object[] indexArray = IteratorUtils.toArray(filteredIndices.iterator());
+ Arrays.sort(indexArray);
+ Arrays.sort(expectedIndices);
+
+ assertArrayEquals(expectedIndices,indexArray);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
index b08c2a5..9bc695c 100644
--- a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
@@ -30,6 +30,7 @@ import java.util.Date;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.*;
@@ -71,11 +72,10 @@ public class HDFSDataPrunerTest {
}
- @Test(expected = RuntimeException.class)
+ @Test(expected = StartDateException.class)
public void testFailsOnTodaysDate() throws Exception {
HDFSDataPruner pruner = new HDFSDataPruner(todaysDate, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
- pruner.prune();
}
@@ -114,6 +114,19 @@ public class HDFSDataPrunerTest {
}
+ @Test
+ public void testIgnoresDirectoies() throws Exception {
+
+ FileSystem testFS = mock(FileSystem.class);
+ when(testFS.isDirectory((Path) any())).thenReturn(true);
+
+ HDFSDataPruner pruner = new HDFSDataPruner(yesterday, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
+ pruner.fileSystem = testFS;
+ HDFSDataPruner.DateFileFilter filter = pruner.new DateFileFilter(pruner, false);
+ assertFalse("Should ignore directories",filter.accept(new Path("/tmp")));
+
+ }
+
@Test(expected = RuntimeException.class)
public void testThrowBadFile() throws Exception {
@@ -122,6 +135,7 @@ public class HDFSDataPrunerTest {
when(testFS.getFileStatus((Path) any())).thenThrow(new IOException("Test Exception"));
HDFSDataPruner pruner = new HDFSDataPruner(yesterday, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
+
pruner.fileSystem = testFS;
HDFSDataPruner.DateFileFilter filter = pruner.new DateFileFilter(pruner, true);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml b/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
deleted file mode 100755
index 68d5eac..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration monitorInterval="60">
- <Appenders>
- <Console name="Console" target="SYSTEM_OUT">
- <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n"/>
- </Console>
- </Appenders>
- <Loggers>
- <Root level="error">
- <AppenderRef ref="Console"/>
- </Root>
- </Loggers>
-</configuration>
-
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties b/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
deleted file mode 100644
index 5798db4..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
+++ /dev/null
@@ -1,17 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-.level= ERROR
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 81aeec2..894557c 100644
--- a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -60,7 +60,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
builder.put(optionalSettings);
}
client = new TransportClient(builder.build())
- .addTransportAddress(new InetSocketTransportAddress((String) globalConfiguration.get("es.ip"), (Integer) globalConfiguration.get("es.port")));
+ .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
dateFormat = new SimpleDateFormat((String) globalConfiguration.get("es.date.format"));
}
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
index 4f26365..408be2d 100644
--- a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -34,7 +34,7 @@ import java.util.Properties;
public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
private String indexDir = "target/elasticsearch";
- private String dateFormat = "yyyy.MM.dd.hh";
+ private String dateFormat = "yyyy.MM.dd.HH";
private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
@Override
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 2e4e0cc..cd56270 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -143,7 +143,7 @@ public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
public void test() throws Exception {
cleanHdfsDir(hdfsDir);
final Configurations configurations = SampleUtil.getSampleConfigs();
- final String dateFormat = "yyyy.MM.dd.hh";
+ final String dateFormat = "yyyy.MM.dd.HH";
final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
final String cf = "cf";
final String trackerHBaseTableName = "tracker";
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
index b15e682..71da674 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
@@ -27,7 +27,8 @@ public class SampleUtil {
public static final String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
- public static Configurations getSampleConfigs() throws IOException {
+ public static
+ Configurations getSampleConfigs() throws IOException {
Configurations configurations = new Configurations();
configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot));
Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json b/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
index b8b449b..721f70f 100644
--- a/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
+++ b/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
@@ -2,7 +2,7 @@
"es.clustername": "metron",
"es.ip": "localhost",
"es.port": 9300,
- "es.date.format": "yyyy.MM.dd.hh",
+ "es.date.format": "yyyy.MM.dd.HH",
"solr.zookeeper": "localhost:2181",
"solr.collection": "metron",
"solr.numShards": 1,
http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml
index 2b40883..ecd5994 100644
--- a/metron-streaming/pom.xml
+++ b/metron-streaming/pom.xml
@@ -21,13 +21,13 @@
<packaging>pom</packaging>
<name>Metron-Streaming</name>
<description>Stream analytics for Metron</description>
- <url>https://metron.incubator.apache.org/</url>
- <scm>
- <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</connection>
- <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</developerConnection>
- <tag>HEAD</tag>
- <url>https://git-wip-us.apache.org/repos/asf/incubator-metron</url>
- </scm>
+ <url>https://metron.incubator.apache.org/</url>
+ <scm>
+ <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</connection>
+ <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</developerConnection>
+ <tag>HEAD</tag>
+ <url>https://git-wip-us.apache.org/repos/asf/incubator-metron</url>
+ </scm>
<properties>
<twitter>@ApacheMetron</twitter>
<global_opencsv_version>3.7</global_opencsv_version>
@@ -48,7 +48,7 @@
<global_slf4j_version>1.7.7</global_slf4j_version>
<global_opencsv_version>3.7</global_opencsv_version>
<global_solr_version>5.2.1</global_solr_version>
- <global_mockito_version>1.10.16</global_mockito_version>
+ <global_mockito_version>1.10.19</global_mockito_version>
</properties>
<licenses>
<license>
@@ -82,6 +82,15 @@
<module>Metron-Testing</module>
<module>Metron-TestingUtilities</module>
</modules>
+ <dependencyManagement>
+ <dependencies>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${global_mockito_version}</version>
+ </dependency>
+ </dependencies>
+ </dependencyManagement>
<dependencies>
<dependency>
<groupId>junit</groupId>
@@ -94,31 +103,83 @@
<artifactId>multiline-string</artifactId>
<version>0.1.2</version>
<scope>test</scope>
-
</dependency>
</dependencies>
<build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-checkstyle-plugin</artifactId>
- <version>2.17</version>
- <executions>
- <execution>
- <id>checkstyle</id>
- <phase>package</phase>
- <goals>
- <goal>check</goal>
- </goals>
- </execution>
- </executions>
- <configuration>
- <configLocation>style/checkstyle.xml</configLocation>
- <headerLocation>style/LICENSE</headerLocation>
- <failOnViolation>true</failOnViolation>
- <includeTestSourceDirectory>true</includeTestSourceDirectory>
- </configuration>
- </plugin>
+ <pluginManagement>
+ <plugins>
+ <plugin>
+ <!-- Separates the unit tests from the integration tests. -->
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.18</version>
+ <configuration>
+ <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <skip>true</skip>
+ <!-- Show 100% of the lines from the stack trace (doesn't work) -->
+ <trimStackTrace>false</trimStackTrace>
+ </configuration>
+ <executions>
+ <execution>
+ <id>unit-tests</id>
+ <phase>test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include unit tests within integration-test phase. -->
+ <include>**/*Test.java</include>
+ </includes>
+ <excludes>
+ <!-- Exclude integration tests within (unit) test phase. -->
+ <exclude>**/*IntegrationTest.java</exclude>
+ </excludes>
+ </configuration>
+ </execution>
+ <execution>
+ <id>integration-tests</id>
+ <phase>integration-test</phase>
+ <goals>
+ <goal>test</goal>
+ </goals>
+ <configuration>
+ <!-- Never skip running the tests when the integration-test phase is invoked -->
+ <skip>false</skip>
+ <includes>
+ <!-- Include integration tests within integration-test phase. -->
+ <include>**/*IntegrationTest.java</include>
+ </includes>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </pluginManagement>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-checkstyle-plugin</artifactId>
+ <version>2.17</version>
+ <executions>
+ <execution>
+ <id>checkstyle</id>
+ <phase>package</phase>
+ <goals>
+ <goal>check</goal>
+ </goals>
+ </execution>
+ </executions>
+ <configuration>
+ <configLocation>style/checkstyle.xml</configLocation>
+ <headerLocation>style/LICENSE</headerLocation>
+ <failOnViolation>true</failOnViolation>
+ <includeTestSourceDirectory>true</includeTestSourceDirectory>
+ </configuration>
+ </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
@@ -165,8 +226,8 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
- <configuration>
- <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+ <configuration>
+ <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
<systemProperties>
<property>
<name>mode</name>
@@ -200,18 +261,17 @@
</plugin>
</plugins>
</reporting>
- <repositories>
- <repository>
- <id>clojars.org</id>
- <url>http://clojars.org/repo</url>
- </repository>
- <repository>
- <id>multiline-release-repo</id>
- <url>https://raw.github.com/benelog/multiline/master/maven-repository</url>
- <snapshots>
- <enabled>false</enabled>
- </snapshots>
- </repository>
-
+ <repositories>
+ <repository>
+ <id>clojars.org</id>
+ <url>http://clojars.org/repo</url>
+ </repository>
+ <repository>
+ <id>multiline-release-repo</id>
+ <url>https://raw.github.com/benelog/multiline/master/maven-repository</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
</repositories>
</project>