You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ly...@apache.org on 2016/04/19 18:18:22 UTC

incubator-metron git commit: METRON-96: Create data purging script for ES Index closes apache/incubator-metron#79

Repository: incubator-metron
Updated Branches:
  refs/heads/master 25e732661 -> 4931f2257


METRON-96: Create data purging script for ES Index closes apache/incubator-metron#79


Project: http://git-wip-us.apache.org/repos/asf/incubator-metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-metron/commit/4931f225
Tree: http://git-wip-us.apache.org/repos/asf/incubator-metron/tree/4931f225
Diff: http://git-wip-us.apache.org/repos/asf/incubator-metron/diff/4931f225

Branch: refs/heads/master
Commit: 4931f22571ce98ed2b5a068052ad71d0a2348f40
Parents: 25e7326
Author: David Lyle <dl...@gmail.com>
Authored: Tue Apr 19 12:17:56 2016 -0400
Committer: David Lyle <dl...@gmail.com>
Committed: Tue Apr 19 12:17:56 2016 -0400

----------------------------------------------------------------------
 .../roles/metron_streaming/defaults/main.yml    |  10 +-
 .../roles/metron_streaming/tasks/es_purge.yml   |  42 ++++
 .../roles/metron_streaming/tasks/hdfs_purge.yml |  12 +-
 .../roles/metron_streaming/tasks/main.yml       |   1 +
 .../templates/config/elasticsearch.global.json  |   6 +-
 .../org/apache/metron/domain/Configuration.java |  60 ++++++
 .../apache/metron/domain/ConfigurationTest.java |  90 ++++++++
 .../src/test/resources/config/global.json       |   3 +
 .../src/test/resources/config/sensors/bro.json  |  19 ++
 metron-streaming/Metron-DataLoads/pom.xml       | 116 +++++-----
 .../main/bash/prune_elasticsearch_indices.sh    |  21 ++
 .../metron/dataloads/bulk/DataPruner.java       |  66 ++++++
 .../dataloads/bulk/ElasticsearchDataPruner.java | 135 ++++++++++++
 .../bulk/ElasticsearchDataPrunerRunner.java     | 191 +++++++++++++++++
 .../metron/dataloads/bulk/HDFSDataPruner.java   |  44 +---
 .../dataloads/bulk/StartDateException.java      |  31 +++
 .../ElasticsearchDataPrunerIntegrationTest.java | 156 ++++++++++++++
 .../bulk/ElasticsearchDataPrunerRunnerTest.java |  72 +++++++
 .../bulk/ElasticsearchDataPrunerTest.java       | 210 +++++++++++++++++++
 .../dataloads/bulk/HDFSDataPrunerTest.java      |  18 +-
 .../src/test/resources/log4j2.xml               |  31 ---
 .../src/test/resources/logging.properties       |  17 --
 .../metron/writer/ElasticsearchWriter.java      |   2 +-
 .../ElasticsearchEnrichmentIntegrationTest.java |   2 +-
 .../integration/EnrichmentIntegrationTest.java  |   2 +-
 .../java/org/apache/metron/util/SampleUtil.java |   3 +-
 .../main/resources/sample/config/global.json    |   2 +-
 metron-streaming/pom.xml                        | 150 +++++++++----
 28 files changed, 1315 insertions(+), 197 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/defaults/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/defaults/main.yml b/deployment/roles/metron_streaming/defaults/main.yml
index 0eff59f..f0f605d 100644
--- a/deployment/roles/metron_streaming/defaults/main.yml
+++ b/deployment/roles/metron_streaming/defaults/main.yml
@@ -56,13 +56,19 @@ storm_topologies:
     - "{{ metron_directory }}/config/topologies/enrichment/remote.yaml"
 
 hdfs_retention_days: 30
-hdfs_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-bro-purge.log 2>&1"
-hdfs_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/yaf_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-yaf-purge.log 2>&1"
+hdfs_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/bro_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-hdfs-bro-purge.log 2>&1"
+hdfs_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/yaf_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-hdfs-yaf-purge.log 2>&1"
+hdfs_snort_purge_cronjob: "{{ metron_directory }}/bin/prune_hdfs_files.sh -f {{ hdfs_url }} -g '/apps/metron/enrichment/indexed/snort_doc/*enrichment-*' -s $(date -d '{{ hdfs_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-hdfs-yaf-purge.log 2>&1"
 
 elasticsearch_config_path: /etc/elasticsearch
 elasticsearch_cluster_name: metron
 elasticsearch_transport_port: 9300
 
+es_retention_days: 30
+es_bro_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p bro_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/bro-purge/cron-es-bro-purge.log 2>&1"
+es_yaf_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p yaf_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/yaf-purge/cron-es-yaf-purge.log 2>&1"
+es_snort_purge_cronjob: "{{ metron_directory }}/bin/prune_elasticsearch_indices.sh -z {{ zookeeper_url }} -p yaf_index_ -s $(date -d '{{ es_retention_days }} days ago' +%m/%d/%Y) -n 1 >> /var/log/snort-purge/cron-es-snort-purge.log 2>&1"
+
 metron_hdfs_output_dir: "/apps/metron"
 metron_hdfs_rotation_policy: org.apache.storm.hdfs.bolt.rotation.TimedRotationPolicy
 metron_hdfs_rotation_policy_count: 1

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/es_purge.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/es_purge.yml b/deployment/roles/metron_streaming/tasks/es_purge.yml
new file mode 100644
index 0000000..22616ca
--- /dev/null
+++ b/deployment/roles/metron_streaming/tasks/es_purge.yml
@@ -0,0 +1,42 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+---
+- name: Create Empty Log Files for ES Purge
+  file:
+    path: "{{ item }}"
+    state: touch
+    owner: hdfs
+    group: hdfs
+    mode: 0644
+  with_items:
+    - /var/log/bro-purge/cron-es-bro-purge.log
+    - /var/log/yaf-purge/cron-es-yaf-purge.log
+    - /var/log/snort-purge/cron-es-snort-purge.log
+
+
+- name: Purge Elasticsearch Indices every 30 days.
+  cron:
+    name: "{{ item.name }}"
+    job: "{{ item.job }}"
+    special_time: daily
+    user: hdfs
+  with_items:
+    - { name: "bro_es_purge", job:  "{{ es_bro_purge_cronjob }}" }
+    - { name: "yaf_es_purge", job: "{{ es_yaf_purge_cronjob }}" }
+    - { name: "snort_es_purge", job: "{{ es_snort_purge_cronjob }}" }
+
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/hdfs_purge.yml b/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
index 1f63451..33442e4 100644
--- a/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
+++ b/deployment/roles/metron_streaming/tasks/hdfs_purge.yml
@@ -25,6 +25,7 @@
   with_items:
     - /var/log/bro-purge
     - /var/log/yaf-purge
+    - /var/log/snort-purge
 
 - name: Create Empty Log Files for HDFS Purge
   file:
@@ -34,9 +35,9 @@
     group: hdfs
     mode: 0644
   with_items:
-    - /var/log/bro-purge/cron-bro-purge.log
-    - /var/log/yaf-purge/cron-yaf-purge.log
-
+    - /var/log/bro-purge/cron-hdfs-bro-purge.log
+    - /var/log/yaf-purge/cron-hdfs-yaf-purge.log
+    - /var/log/snort-purge/cron-hdfs-snort-purge.log
 
 - name: Purge HDFS Sensor Data every 30 days.
   cron:
@@ -45,6 +46,7 @@
     special_time: daily
     user: hdfs
   with_items:
-    - { name: "bro_purge", job:  "{{ hdfs_bro_purge_cronjob }}" }
-    - { name: "yaf_purge", job: "{{ hdfs_yaf_purge_cronjob }}" }
+    - { name: "bro_hdfs_purge", job:  "{{ hdfs_bro_purge_cronjob }}" }
+    - { name: "yaf_hdfs_purge", job: "{{ hdfs_yaf_purge_cronjob }}" }
+    - { name: "snort_hdfs_purge", job: "{{ hdfs_snort_purge_cronjob }}" }
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/tasks/main.yml
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/tasks/main.yml b/deployment/roles/metron_streaming/tasks/main.yml
index 152af56..e076645 100644
--- a/deployment/roles/metron_streaming/tasks/main.yml
+++ b/deployment/roles/metron_streaming/tasks/main.yml
@@ -133,3 +133,4 @@
 
 - include: hdfs_purge.yml
 
+- include: es_purge.yml

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
----------------------------------------------------------------------
diff --git a/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json b/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
index aa1076c..8177102 100644
--- a/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
+++ b/deployment/roles/metron_streaming/templates/config/elasticsearch.global.json
@@ -1,6 +1,6 @@
 {
   "es.clustername": "{{ elasticsearch_cluster_name }}",
   "es.ip": "{{ groups.search[0] }}",
-  "es.port": {{ elasticsearch_transport_port }},
-  "es.date.format": "yyyy.MM.dd.hh"
-}
\ No newline at end of file
+  "es.port": "{{ elasticsearch_transport_port }}",
+  "es.date.format": "yyyy.MM.dd.HH"
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
new file mode 100644
index 0000000..d21c686
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/main/java/org/apache/metron/domain/Configuration.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.metron.utils.ConfigurationsUtils;
+
+import java.nio.file.Path;
+import java.util.Map;
+
+public class Configuration extends Configurations {
+
+    protected CuratorFramework curatorFramework = null;
+    private Path configFileRoot;
+
+    public Configuration(CuratorFramework curatorFramework){
+
+        this.curatorFramework = curatorFramework;
+
+    }
+
+
+    public Configuration(Path configFileRoot){
+
+        this.configFileRoot = configFileRoot;
+    }
+
+    public void update() throws Exception {
+
+        if( null != curatorFramework ) {
+
+            ConfigurationsUtils.updateConfigsFromZookeeper(this, this.curatorFramework);
+
+        } else {
+
+            updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(configFileRoot.toAbsolutePath().toString()));
+            Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(configFileRoot.toAbsolutePath().toString());
+            for(String sensorType: sensorEnrichmentConfigs.keySet()) {
+                updateSensorEnrichmentConfig(sensorType, sensorEnrichmentConfigs.get(sensorType));
+            }
+
+        }
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
new file mode 100644
index 0000000..74200b2
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/java/org/apache/metron/domain/ConfigurationTest.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.domain;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.api.ExistsBuilder;
+import org.apache.curator.framework.api.GetChildrenBuilder;
+import org.apache.curator.framework.api.GetDataBuilder;
+import org.apache.metron.Constants;
+import org.json.simple.JSONObject;
+import org.junit.Test;
+
+import java.nio.file.Paths;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ConfigurationTest {
+
+    private static final String TEST_PROPERTY = "configuration.class.test.property";
+    private static final String TEST_VALUE = "Configuration";
+    @Test
+    public void testCanReadFromFile() throws Exception {
+
+        Configuration configuration = new Configuration(Paths.get("./src/test/resources/config/"));
+        configuration.update();
+
+        checkResult(configuration);
+
+    }
+
+    @Test
+    public void testCanReadFromZookeeper() throws Exception {
+
+        CuratorFramework curatorFramework = mock(CuratorFramework.class);
+        ExistsBuilder existsBuilder = mock(ExistsBuilder.class);
+        GetDataBuilder getDataBuilder = mock(GetDataBuilder.class);
+        GetChildrenBuilder getChildrenBuilder = mock(GetChildrenBuilder.class);
+
+        when(getDataBuilder.forPath(Constants.ZOOKEEPER_GLOBAL_ROOT)).thenReturn(mockGlobalData());
+        when(curatorFramework.checkExists()).thenReturn(existsBuilder);
+        when(curatorFramework.getData()).thenReturn(getDataBuilder);
+        when(curatorFramework.getChildren()).thenReturn(getChildrenBuilder);
+        when(getChildrenBuilder.forPath(anyString())).thenReturn(Collections.<String> emptyList());
+
+        Configuration configuration = new Configuration(Paths.get("foo"));
+        configuration.curatorFramework = curatorFramework;
+        configuration.update();
+
+        checkResult(configuration);
+    }
+
+
+    private byte[] mockGlobalData(){
+
+        JSONObject global = new JSONObject();
+        global.put(TEST_PROPERTY, TEST_VALUE);
+        return global.toString().getBytes();
+
+    }
+
+
+    private void checkResult( Configuration configuration ){
+
+        assertEquals("File contains 1 entry: ",1,configuration.getGlobalConfig().size());
+        String testValue = configuration.getGlobalConfig().get(TEST_PROPERTY).toString();
+        assertEquals(TEST_PROPERTY + " should be \"" + TEST_VALUE + "\"",TEST_VALUE,testValue);
+
+
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/resources/config/global.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/global.json b/metron-streaming/Metron-Common/src/test/resources/config/global.json
new file mode 100644
index 0000000..44ce6b1
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/resources/config/global.json
@@ -0,0 +1,3 @@
+{
+  "configuration.class.test.property": "Configuration"
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json b/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
new file mode 100644
index 0000000..8886495
--- /dev/null
+++ b/metron-streaming/Metron-Common/src/test/resources/config/sensors/bro.json
@@ -0,0 +1,19 @@
+{
+  "index": "bro",
+  "batchSize": 5,
+  "enrichmentFieldMap":
+  {
+    "geo": ["ip_dst_addr", "ip_src_addr"],
+    "host": ["host"]
+  },
+  "threatIntelFieldMap":
+  {
+    "hbaseThreatIntel": ["ip_dst_addr", "ip_src_addr"]
+  },
+  "fieldToThreatIntelTypeMap":
+  {
+    "ip_dst_addr" : [ "malicious_ip" ]
+    ,"ip_src_addr" : [ "malicious_ip" ]
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/pom.xml b/metron-streaming/Metron-DataLoads/pom.xml
index 53c39da..2d49995 100644
--- a/metron-streaming/Metron-DataLoads/pom.xml
+++ b/metron-streaming/Metron-DataLoads/pom.xml
@@ -120,6 +120,41 @@
             <version>${httpcore.version}</version>
         </dependency>
         <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${global_elasticsearch_version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.carrotsearch.randomizedtesting</groupId>
+            <artifactId>randomizedtesting-runner</artifactId>
+            <version>2.1.14</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.elasticsearch</groupId>
+            <artifactId>elasticsearch</artifactId>
+            <version>${global_elasticsearch_version}</version>
+            <type>test-jar</type>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-test-framework</artifactId>
+            <version>4.10.4</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.lucene</groupId>
+            <artifactId>lucene-core</artifactId>
+            <version>4.10.4</version>
+        </dependency>
+        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-testing-util</artifactId>
             <version>${global_hbase_version}</version>
@@ -138,21 +173,44 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>log4j-over-slf4j</artifactId>
-            <version>${global_slf4j_version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>Metron-Testing</artifactId>
             <version>${project.parent.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-slf4j-impl</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.mockito</groupId>
             <artifactId>mockito-core</artifactId>
-            <version>${global_mockito_version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-module-junit4</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-mockito</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.powermock</groupId>
+            <artifactId>powermock-api-easymock</artifactId>
+            <version>1.6.2</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.easymock</groupId>
+            <artifactId>easymock</artifactId>
+            <version>3.4</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
@@ -167,56 +225,14 @@
         </resources>
         <plugins>
             <plugin>
-                <!-- Separates the unit tests from the integration tests. -->
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-surefire-plugin</artifactId>
-                <version>2.12.4</version>
                 <configuration>
-                    <!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
-                    <argLine>-Xmx2048m -XX:MaxPermSize=256m -Djava.util.logging.config.file=logging.properties</argLine>
+                    <argLine>-Xmx2048m -XX:MaxPermSize=256m -XX:-UseSplitVerifier</argLine>
                     <skip>true</skip>
-                    <!-- Show 100% of the lines from the stack trace (doesn't work) -->
                     <trimStackTrace>false</trimStackTrace>
-
                 </configuration>
-                <executions>
-                    <execution>
-                        <id>unit-tests</id>
-                        <phase>test</phase>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                        <configuration>
-                            <!-- Never skip running the tests when the test phase is invoked -->
-                            <skip>false</skip>
-                            <includes>
-                                <!-- Include unit tests within integration-test phase. -->
-                                <include>**/*Test.java</include>
-                            </includes>
-                            <excludes>
-                                <!-- Exclude integration tests within (unit) test phase. -->
-                                <exclude>**/*IntegrationTest.java</exclude>
-                            </excludes>
-                        </configuration>
-                    </execution>
-                    <execution>
-                        <id>integration-tests</id>
-                        <phase>integration-test</phase>
-                        <goals>
-                            <goal>test</goal>
-                        </goals>
-                        <configuration>
-                            <!-- Never skip running the tests when the integration-test phase is invoked -->
-                            <skip>false</skip>
-                            <includes>
-                                <!-- Include integration tests within integration-test phase. -->
-                                <include>**/*IntegrationTest.java</include>
-                            </includes>
-                        </configuration>
-                    </execution>
-                </executions>
             </plugin>
-
             <plugin>
                 <artifactId>maven-compiler-plugin</artifactId>
                 <version>3.1</version>

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh b/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
new file mode 100644
index 0000000..fc69858
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/bash/prune_elasticsearch_indices.sh
@@ -0,0 +1,21 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+yarn jar /usr/metron/${project.version}/lib/Metron-DataLoads-${project.version}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner "$@"
+

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
new file mode 100644
index 0000000..98e3b52
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/DataPruner.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+public abstract class DataPruner {
+
+    protected static final Logger LOG = LoggerFactory.getLogger(DataPruner.class);
+    protected long firstTimeMillis;
+    protected long lastTimeMillis;
+    protected String wildCard;
+
+    public DataPruner(Date startDate, Integer numDays, String wildCard) throws StartDateException {
+
+        Date startAtMidnight = dateAtMidnight(startDate);
+        this.lastTimeMillis = startDate.getTime();
+        this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
+        this.wildCard = wildCard;
+
+        Date today = dateAtMidnight(new Date());
+
+        if (!today.after(startAtMidnight)) {
+            throw new StartDateException("Prune Start Date must be prior to today");
+        }
+    }
+
+    protected Date dateAtMidnight(Date date) {
+
+        Calendar calendar = Calendar.getInstance();
+
+        calendar.setTime(date);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        return calendar.getTime();
+
+    }
+
+
+    public abstract Long prune() throws IOException;
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
new file mode 100644
index 0000000..ebd0878
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPruner.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.Iterables;
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.domain.Configuration;
+import org.elasticsearch.client.AdminClient;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.Iterator;
+
+public class ElasticsearchDataPruner extends DataPruner {
+
+    private String indexPattern;
+    private SimpleDateFormat dateFormat;
+    protected Client indexClient = null;
+    protected Configuration configuration;
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+    private static final String defaultDateFormat = "yyyy.MM.dd.HH";
+
+
+
+    private Predicate<String> filterWithRegex = new Predicate<String>() {
+
+        @Override
+        public boolean apply(String str) {
+
+            try {
+                String dateString = str.substring(indexPattern.length());
+                Date indexCreateDate = dateFormat.parse(dateString);
+                long indexCreatedDate = indexCreateDate.getTime();
+                if (indexCreatedDate >= firstTimeMillis && indexCreatedDate < lastTimeMillis) {
+                    return true;
+                }
+            } catch (ParseException e) {
+                LOG.error("Unable to parse date from + " + str.substring(indexPattern.length()), e);
+            }
+
+            return false;
+        }
+
+    };
+
+    ElasticsearchDataPruner(Date startDate, Integer numDays,Configuration configuration, Client indexClient, String indexPattern) throws Exception {
+
+        super(startDate, numDays, indexPattern);
+
+        this.indexPattern = indexPattern;
+        this.dateFormat = new SimpleDateFormat(defaultDateFormat);
+        this.configuration = configuration;
+        this.indexClient = indexClient;
+
+
+    }
+
+    @Override
+    public Long prune() throws IOException {
+
+        try {
+
+            configuration.update();
+
+        }
+        catch(Exception e) {
+
+            LOG.error("Unable to update configs",e);
+
+        }
+
+        String dateString = configuration.getGlobalConfig().get("es.date.format").toString();
+
+        if( null != dateString ){
+            dateFormat = new SimpleDateFormat(dateString);
+        }
+
+        ImmutableOpenMap<String, IndexMetaData> allIndices = indexClient.admin().cluster().prepareState().get().getState().getMetaData().getIndices();
+        Iterable indicesForDeletion = getFilteredIndices(allIndices);
+        Object[] indexArray = IteratorUtils.toArray(indicesForDeletion.iterator());
+
+        if(indexArray.length > 0) {
+            String[] indexStringArray = new String[indexArray.length];
+            System.arraycopy(indexArray, 0, indexStringArray, 0, indexArray.length);
+            deleteIndex(indexClient.admin(), indexStringArray);
+        }
+
+        return new Long(indexArray.length);
+
+    }
+
+    protected Boolean deleteIndex(AdminClient adminClient, String... index) {
+
+        boolean isAcknowledged = adminClient.indices().delete(adminClient.indices().prepareDelete(index).request()).actionGet().isAcknowledged();
+        return new Boolean(isAcknowledged);
+
+    }
+
+    protected Iterable<String> getFilteredIndices(ImmutableOpenMap<String, IndexMetaData> indices) {
+
+        String[] returnedIndices = new String[indices.size()];
+        Iterator it = indices.keysIt();
+        System.arraycopy(IteratorUtils.toArray(it), 0, returnedIndices, 0, returnedIndices.length);
+        Iterable<String> matches = Iterables.filter(Arrays.asList(returnedIndices), filterWithRegex);
+
+        return matches;
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
new file mode 100644
index 0000000..46100a2
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunner.java
@@ -0,0 +1,191 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.*;
+import org.apache.curator.RetryPolicy;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.metron.domain.Configuration;
+import org.elasticsearch.client.transport.TransportClient;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.transport.InetSocketTransportAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+public class ElasticsearchDataPrunerRunner {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchDataPruner.class);
+
+    public static void main(String... argv) throws IOException, java.text.ParseException, ClassNotFoundException, InterruptedException {
+
+        /**
+         * Example
+         * start=$(date -d '30 days ago' +%m/%d/%Y)
+         * yarn jar Metron-DataLoads-{VERSION}.jar org.apache.metron.dataloads.bulk.ElasticsearchDataPrunerRunner -i host1:9300 -p '/bro_index_' -s $(date -d '30 days ago' +%m/%d/%Y) -n 1;
+         * echo ${start}
+         **/
+
+        Options options = buildOptions();
+        Options help = new Options();
+        TransportClient client = null;
+
+        Option o = new Option("h", "help", false, "This screen");
+        o.setRequired(false);
+        help.addOption(o);
+
+
+
+        try {
+
+            CommandLine cmd = checkOptions(help,options, argv);
+
+            String start = cmd.getOptionValue("s");
+            Date startDate = new SimpleDateFormat("MM/dd/yyyy").parse(start);
+
+            Integer numDays = Integer.parseInt(cmd.getOptionValue("n"));
+            String indexPrefix = cmd.getOptionValue("p");
+
+            if(LOG.isDebugEnabled()) {
+                LOG.debug("Running prune with args: " + startDate + " " + numDays);
+            }
+
+            Configuration configuration = null;
+
+            if( cmd.hasOption("z")){
+
+                RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
+                CuratorFramework framework = CuratorFrameworkFactory.newClient(cmd.getOptionValue("z"),retryPolicy);
+                framework.start();
+                configuration = new Configuration(framework);
+
+            } else if ( cmd.hasOption("c") ){
+
+                String resourceFile = cmd.getOptionValue("c");
+                configuration = new Configuration(Paths.get(resourceFile));
+
+            }
+
+            configuration.update();
+
+            Map<String, Object> globalConfiguration = configuration.getGlobalConfig();
+            ImmutableSettings.Builder builder = ImmutableSettings.settingsBuilder();
+            builder.put("cluster.name", globalConfiguration.get("es.clustername"));
+            builder.put("curatorFramework.transport.ping_timeout","500s");
+            client = new TransportClient(builder.build())
+                    .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
+
+            DataPruner pruner = new ElasticsearchDataPruner(startDate, numDays, configuration, client, indexPrefix);
+
+            LOG.info("Pruned " + pruner.prune() + " indices from " +  globalConfiguration.get("es.ip") + ":" + globalConfiguration.get("es.port") + "/" + indexPrefix);
+
+
+        } catch (Exception e) {
+
+            e.printStackTrace();
+            System.exit(-1);
+
+        } finally {
+
+            if( null != client) {
+                client.close();
+            }
+
+        }
+
+    }
+
+    public static CommandLine checkOptions(Options help, Options options, String ... argv) throws ParseException {
+
+        CommandLine cmd = null;
+        CommandLineParser parser = new PosixParser();
+
+
+        try {
+
+            cmd = parser.parse(help,argv,true);
+
+            if( cmd.getOptions().length > 0){
+                final HelpFormatter usageFormatter = new HelpFormatter();
+                usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+                System.exit(0);
+            }
+
+            cmd = parser.parse(options, argv);
+
+        } catch (ParseException e) {
+
+            final HelpFormatter usageFormatter = new HelpFormatter();
+            usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+            throw e;
+
+        }
+
+
+        if( (cmd.hasOption("z") && cmd.hasOption("c")) || (!cmd.hasOption("z") && !cmd.hasOption("c")) ){
+
+            System.err.println("One (only) of zookeeper-hosts or config-location is required");
+            final HelpFormatter usageFormatter = new HelpFormatter();
+            usageFormatter.printHelp("ElasticsearchDataPrunerRunner", null, options, null, true);
+            throw new RuntimeException("Must specify zookeeper-hosts or config-location, but not both");
+
+        }
+
+        return cmd;
+    }
+
+    public static Options buildOptions(){
+
+        Options options = new Options();
+
+        Option o = new Option("s", "start-date", true, "Starting Date (MM/DD/YYYY)");
+        o.setArgName("START_DATE");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("n", "numdays", true, "Number of days back to purge");
+        o.setArgName("NUMDAYS");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("p", "index-prefix", true, "Index prefix  - e.g. bro_index_");
+        o.setArgName("PREFIX");
+        o.setRequired(true);
+        options.addOption(o);
+
+        o = new Option("c", "config-location", true, "Directory Path - e.g. /path/to/config/dir");
+        o.setArgName("CONFIG");
+        o.setRequired(false);
+        options.addOption(o);
+
+        o = new Option("z", "zookeeper-hosts", true, "Zookeeper URL - e.g. zkhost1:2181,zkhost2:2181,zkhost3:2181");
+        o.setArgName("PREFIX");
+        o.setRequired(false);
+        options.addOption(o);
+
+        return options;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
index 5fbc27e..097253c 100644
--- a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/HDFSDataPruner.java
@@ -30,29 +30,19 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.text.SimpleDateFormat;
-import java.util.Calendar;
 import java.util.Date;
-import java.util.concurrent.TimeUnit;
 
-public class HDFSDataPruner  {
+public class HDFSDataPruner extends DataPruner {
+
 
-    private static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
-    private Date startDate;
-    private long firstTimeMillis;
-    private long lastTimeMillis;
     private Path globPath;
     protected FileSystem fileSystem;
+    protected static final Logger LOG = LoggerFactory.getLogger(HDFSDataPruner.class);
 
-    private HDFSDataPruner() {
-    }
-
-
-    HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException {
+    HDFSDataPruner(Date startDate, Integer numDays, String fsUri, String globPath) throws IOException, StartDateException {
 
-        this.startDate = dateAtMidnight(startDate);
-        this.lastTimeMillis = startDate.getTime();
-        this.firstTimeMillis = lastTimeMillis - TimeUnit.DAYS.toMillis(numDays);
-        this.globPath = new Path(globPath);
+        super(startDate,numDays,globPath);
+        this.globPath = new Path(wildCard);
         Configuration conf = new Configuration();
         conf.set("fs.defaultFS", fsUri);
         this.fileSystem = FileSystem.get(conf);
@@ -136,7 +126,7 @@ public class HDFSDataPruner  {
                 LOG.debug("Running prune with args: " + startDate + " " + numDays + " " + fileSystemUri + " " + globString);
             }
 
-            HDFSDataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
+            DataPruner pruner = new HDFSDataPruner(startDate, numDays, fileSystemUri, globString);
 
             LOG.info("Pruned " + pruner.prune() + " files from " + fileSystemUri + globString);
 
@@ -151,12 +141,6 @@ public class HDFSDataPruner  {
 
         Long filesPruned = new Long(0);
 
-        Date today = dateAtMidnight(new Date());
-
-        if (!today.after(startDate)) {
-            throw new RuntimeException("Prune Start Date must be prior to today");
-        }
-
         FileStatus[] filesToDelete = fileSystem.globStatus(globPath, new HDFSDataPruner.DateFileFilter(this));
 
         for (FileStatus fileStatus : filesToDelete) {
@@ -173,20 +157,6 @@ public class HDFSDataPruner  {
         return filesPruned;
     }
 
-    private Date dateAtMidnight(Date date) {
-
-        Calendar calendar = Calendar.getInstance();
-
-        calendar.setTime(date);
-        calendar.set(Calendar.HOUR, 0);
-        calendar.set(Calendar.MINUTE, 0);
-        calendar.set(Calendar.SECOND, 0);
-        calendar.set(Calendar.MILLISECOND, 0);
-
-        return calendar.getTime();
-
-    }
-
     class DateFileFilter extends Configured implements PathFilter {
 
         HDFSDataPruner pruner;

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
new file mode 100644
index 0000000..d3a0549
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/main/java/org/apache/metron/dataloads/bulk/StartDateException.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+
+public class StartDateException extends Exception {
+
+    public StartDateException(String message){
+        super(message);
+    }
+
+    public StartDateException(String message, Throwable t){
+        super(message,t);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
new file mode 100644
index 0000000..2053e34
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerIntegrationTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.metron.domain.Configuration;
+import org.easymock.EasyMock;
+import org.elasticsearch.common.settings.ImmutableSettings;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.indices.IndexMissingException;
+import org.elasticsearch.test.ElasticsearchIntegrationTest;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+import java.util.concurrent.TimeUnit;
+
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@ElasticsearchIntegrationTest.ClusterScope(scope = ElasticsearchIntegrationTest.Scope.SUITE, numDataNodes = 1, numClientNodes = 0)
+public class ElasticsearchDataPrunerIntegrationTest extends ElasticsearchIntegrationTest {
+
+    private static File dataPath = new File("./target/elasticsearch-test");
+    private Date testingDate;
+    private Date yesterday = new Date();
+    private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+    private Configuration configuration;
+
+    @BeforeClass
+    public static void setupClass() throws Exception {
+
+        if (dataPath.isDirectory()) {
+            FileUtils.deleteDirectory(dataPath);
+        }
+
+        if (!dataPath.mkdirs()) {
+            throw new RuntimeException("Couldn't create dataPath at: " + dataPath.getAbsolutePath());
+        }
+
+    }
+
+    @AfterClass
+    public static void teardownClass() throws Exception {
+
+        if (dataPath.isDirectory()) {
+            FileUtils.deleteDirectory(dataPath);
+        }
+
+    }
+
+    @Before
+    public void setUp() throws Exception {
+
+        super.setUp();
+        ensureGreen();
+
+        TimeZone timeZone = TimeZone.getTimeZone("UTC");
+        Calendar calendar = Calendar.getInstance(timeZone);
+        calendar.set(Calendar.HOUR_OF_DAY,0);
+        calendar.set(Calendar.MINUTE,0);
+        calendar.set(Calendar.SECOND,0);
+        testingDate = calendar.getTime();
+        yesterday.setTime(testingDate.getTime() - TimeUnit.DAYS.toMillis(1));
+        dateFormat.setTimeZone(timeZone);
+
+        File resourceFile = new File("../Metron-Testing/src/main/resources/sample/config/");
+        Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
+
+        configuration = new Configuration(resourcePath);
+    }
+
+    @Test(expected = IndexMissingException.class)
+    public void testWillThrowOnMissingIndex() throws Exception {
+
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 30, configuration,client(), "*");
+        pruner.deleteIndex(admin(), "baz");
+
+    }
+
+    @Test
+    public void testDeletesCorrectIndexes() throws Exception {
+
+        Integer numDays = 5;
+
+        Date createStartDate = new Date();
+
+        createStartDate.setTime(yesterday.getTime() - TimeUnit.DAYS.toMillis(numDays - 1));
+
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 30, configuration,client(), "*");
+        String indexesToDelete = "sensor_index_" + new SimpleDateFormat("yyyy.MM.dd").format(createStartDate) + ".*";
+        Boolean deleted = pruner.deleteIndex(admin(), indexesToDelete);
+
+        assertTrue("Index deletion should be acknowledged", deleted);
+
+    }
+
+    @Test
+    public void testHandlesNoIndicesToDelete() throws Exception {
+
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(yesterday, 1, configuration, client(), "sensor_index_");
+        Long deleteCount = pruner.prune();
+        assertEquals("Should have pruned 0 indices", 0L, deleteCount.longValue());
+
+
+    }
+
+    @Override
+    protected Settings nodeSettings(int nodeOrdinal) {
+
+        return ImmutableSettings.settingsBuilder()
+                .put("node.data", true)
+                .put("gateway.type", "none")
+                .put("path.data", dataPath.getPath() + "/data")
+                .put("path.work", dataPath.getPath() + "/work")
+                .put("path.logs", dataPath.getPath() + "/logs")
+                .put("cluster.routing.schedule", "50ms")
+                .put("node.local", true).build();
+
+    }
+
+    public Settings indexSettings() {
+
+        return ImmutableSettings.settingsBuilder()
+                .put("index.store.type", "memory")
+                .put("index.store.fs.memory.enabled", "true")
+                .put("index.number_of_shards", 1)
+                .put("index.number_of_replicas", 0).build();
+
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
new file mode 100644
index 0000000..5f32bee
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerRunnerTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+
+public class ElasticsearchDataPrunerRunnerTest {
+
+    private Options options;
+    private Options help;
+
+    private ByteArrayOutputStream outContent;
+    private ByteArrayOutputStream errContent;
+
+    @Before
+    public void setUp(){
+
+        options = ElasticsearchDataPrunerRunner.buildOptions();
+        help = new Options();
+
+        Option o = new Option("h", "help", false, "This screen");
+        o.setRequired(false);
+        help.addOption(o);
+
+        outContent = new ByteArrayOutputStream();
+        errContent = new ByteArrayOutputStream();
+
+        System.setOut(new PrintStream(outContent));
+        System.setErr(new PrintStream(errContent));
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testThrowsWithoutZookeeperOrConfigLocation() throws Exception {
+
+        String[] args = new String[]{"-n","30","-p","sensor_index","-s","03/30/2016"};
+        ElasticsearchDataPrunerRunner.checkOptions(help,options,args);
+
+    }
+
+    @Test(expected = RuntimeException.class)
+    public void testThrowsWithZookeeperAndConfiguration() throws Exception {
+
+        String[] args = new String[]{"-n","30","-p","sensor_index","-s","03/30/2016"};
+        ElasticsearchDataPrunerRunner.checkOptions(help,options,args);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
new file mode 100644
index 0000000..33d3e11
--- /dev/null
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/ElasticsearchDataPrunerTest.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.metron.dataloads.bulk;
+
+import org.apache.commons.collections.IteratorUtils;
+import org.apache.metron.domain.Configuration;
+import org.easymock.EasyMock;
+import org.elasticsearch.action.ActionFuture;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateRequestBuilder;
+import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
+import org.elasticsearch.action.admin.indices.delete.DeleteIndexResponse;
+import org.elasticsearch.client.*;
+import org.elasticsearch.cluster.ClusterState;
+import org.elasticsearch.cluster.metadata.IndexMetaData;
+import org.elasticsearch.cluster.metadata.MetaData;
+import org.elasticsearch.common.collect.ImmutableOpenMap;
+import org.elasticsearch.common.hppc.ObjectObjectOpenHashMap;
+import org.elasticsearch.index.Index;
+import org.elasticsearch.indices.IndexMissingException;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.PrintStream;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(DeleteIndexResponse.class)
+public class ElasticsearchDataPrunerTest {
+
+    private Date testDate;
+    private DateFormat dateFormat = new SimpleDateFormat("yyyy.MM.dd.HH");
+    private Configuration configuration;
+
+    private Client indexClient = mock(Client.class);
+    private AdminClient adminClient = mock(AdminClient.class);
+    private IndicesAdminClient indicesAdminClient = mock(FilterClient.IndicesAdmin.class);
+    private DeleteIndexRequestBuilder deleteIndexRequestBuilder = mock(DeleteIndexRequestBuilder.class);
+    private DeleteIndexRequest deleteIndexRequest = mock(DeleteIndexRequest.class);
+    private ActionFuture<DeleteIndexResponse> deleteIndexAction = mock(ActionFuture.class);
+    private DeleteIndexResponse deleteIndexResponse = PowerMock.createMock(DeleteIndexResponse.class);
+
+
+    private ByteArrayOutputStream outContent;
+    private ByteArrayOutputStream errContent;
+
+    @Before
+    public void setUp() throws Exception {
+
+        Calendar calendar = Calendar.getInstance();
+        calendar.set(Calendar.MONTH, Calendar.MARCH);
+        calendar.set(Calendar.YEAR, 2016);
+        calendar.set(Calendar.DATE, 31);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND,0);
+        testDate = calendar.getTime();
+
+        when(indexClient.admin()).thenReturn(adminClient);
+        when(adminClient.indices()).thenReturn(indicesAdminClient);
+        when(indicesAdminClient.prepareDelete(Matchers.<String>anyVararg())).thenReturn(deleteIndexRequestBuilder);
+        when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenReturn(deleteIndexAction);
+        when(deleteIndexRequestBuilder.request()).thenReturn(deleteIndexRequest);
+        when(deleteIndexAction.actionGet()).thenReturn(deleteIndexResponse);
+
+        File resourceFile = new File("../Metron-Testing/src/main/resources/sample/config/");
+        Path resourcePath = Paths.get(resourceFile.getCanonicalPath());
+
+        configuration = new Configuration(resourcePath);
+
+        outContent = new ByteArrayOutputStream();
+        errContent = new ByteArrayOutputStream();
+
+        System.setOut(new PrintStream(outContent));
+        System.setErr(new PrintStream(errContent));
+
+    }
+
+    @Test(expected = IndexMissingException.class)
+    public void testWillThrowOnMissingIndex() throws Exception {
+
+        when(indicesAdminClient.delete((DeleteIndexRequest) any())).thenThrow(new IndexMissingException(new Index("Test Exception")));
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 30, configuration, indexClient,"*");
+        pruner.deleteIndex(adminClient, "baz");
+
+    }
+
+    @Test
+    public void testDeletesCorrectIndexes() throws Exception {
+
+        //Mock Cluster Admin
+        ClusterAdminClient clusterAdminClient = mock(ClusterAdminClient.class);
+        ClusterStateRequestBuilder clusterStateRequestBuilder = mock(ClusterStateRequestBuilder.class);
+        ClusterStateResponse clusterStateResponse = mock(ClusterStateResponse.class);
+        ClusterState clusterState = mock(ClusterState.class);
+        ObjectObjectOpenHashMap<String, IndexMetaData> clusterIndexes = new ObjectObjectOpenHashMap();
+        MetaData clusterMetadata = mock(MetaData.class);
+        when(adminClient.cluster()).thenReturn(clusterAdminClient);
+        when(clusterAdminClient.prepareState()).thenReturn(clusterStateRequestBuilder);
+        when(clusterStateRequestBuilder.get()).thenReturn(clusterStateResponse);
+        when(clusterStateResponse.getState()).thenReturn(clusterState);
+        when(clusterState.getMetaData()).thenReturn(clusterMetadata);
+
+        int numDays = 5;
+
+        Date indexDate = new Date();
+
+        indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
+
+        for (int i = 0; i < numDays * 24; i++) {
+
+            String indexName = "sensor_index_" + dateFormat.format(indexDate);
+            clusterIndexes.put(indexName, null);
+            indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
+
+        }
+
+        when(clusterMetadata.getIndices()).thenReturn(ImmutableOpenMap.copyOf(clusterIndexes));
+
+
+        EasyMock.expect(deleteIndexResponse.isAcknowledged()).andReturn(true);
+
+        replayAll();
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration, indexClient, "sensor_index_");
+        pruner.indexClient = indexClient;
+        Long deleteCount = pruner.prune();
+        assertEquals("Should have pruned 24 indices", 24L, deleteCount.longValue());
+        verifyAll();
+
+    }
+
+    @Test
+    public void testFilter() throws Exception {
+
+        ObjectObjectOpenHashMap<String, IndexMetaData> indexNames = new ObjectObjectOpenHashMap();
+        SimpleDateFormat dateChecker = new SimpleDateFormat("yyyyMMdd");
+        int numDays = 5;
+        String[] expectedIndices = new String[24];
+        Date indexDate = new Date();
+
+        indexDate.setTime(testDate.getTime() - TimeUnit.DAYS.toMillis(numDays));
+
+        for (int i = 0, j=0; i < numDays * 24; i++) {
+
+            String indexName = "sensor_index_" + dateFormat.format(indexDate);
+            //Delete 20160330
+            if( dateChecker.format(indexDate).equals("20160330") ){
+                expectedIndices[j++] = indexName;
+            }
+
+            indexNames.put(indexName, null);
+            indexDate.setTime(indexDate.getTime() + TimeUnit.HOURS.toMillis(1));
+
+        }
+
+        ImmutableOpenMap<String, IndexMetaData> testIndices = ImmutableOpenMap.copyOf(indexNames);
+
+        ElasticsearchDataPruner pruner = new ElasticsearchDataPruner(testDate, 1, configuration,  indexClient, "sensor_index_");
+        pruner.indexClient = indexClient;
+
+        Iterable<String> filteredIndices = pruner.getFilteredIndices(testIndices);
+
+        Object[] indexArray = IteratorUtils.toArray(filteredIndices.iterator());
+        Arrays.sort(indexArray);
+        Arrays.sort(expectedIndices);
+
+        assertArrayEquals(expectedIndices,indexArray);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
index b08c2a5..9bc695c 100644
--- a/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
+++ b/metron-streaming/Metron-DataLoads/src/test/java/org/apache/metron/dataloads/bulk/HDFSDataPrunerTest.java
@@ -30,6 +30,7 @@ import java.util.Date;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.*;
 
@@ -71,11 +72,10 @@ public class HDFSDataPrunerTest {
 
     }
 
-    @Test(expected = RuntimeException.class)
+    @Test(expected = StartDateException.class)
     public void testFailsOnTodaysDate() throws Exception {
 
         HDFSDataPruner pruner = new HDFSDataPruner(todaysDate, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
-        pruner.prune();
 
     }
 
@@ -114,6 +114,19 @@ public class HDFSDataPrunerTest {
 
     }
 
+    @Test
+    public void testIgnoresDirectoies() throws Exception {
+
+        FileSystem testFS = mock(FileSystem.class);
+        when(testFS.isDirectory((Path) any())).thenReturn(true);
+
+        HDFSDataPruner pruner = new HDFSDataPruner(yesterday, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
+        pruner.fileSystem = testFS;
+        HDFSDataPruner.DateFileFilter filter = pruner.new DateFileFilter(pruner, false);
+        assertFalse("Should ignore directories",filter.accept(new Path("/tmp")));
+
+    }
+
     @Test(expected = RuntimeException.class)
     public void testThrowBadFile() throws Exception {
 
@@ -122,6 +135,7 @@ public class HDFSDataPrunerTest {
         when(testFS.getFileStatus((Path) any())).thenThrow(new IOException("Test Exception"));
 
         HDFSDataPruner pruner = new HDFSDataPruner(yesterday, 30, "file:///", dataPath.getAbsolutePath() + "/file-*");
+
         pruner.fileSystem = testFS;
         HDFSDataPruner.DateFileFilter filter = pruner.new DateFileFilter(pruner, true);
 

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml b/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
deleted file mode 100755
index 68d5eac..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/resources/log4j2.xml
+++ /dev/null
@@ -1,31 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<configuration monitorInterval="60">
-  <Appenders>
-    <Console name="Console" target="SYSTEM_OUT">
-     <PatternLayout pattern="%-4r [%t] %-5p %c{1.} - %msg%n"/>
-    </Console>
-  </Appenders>
-  <Loggers>
-    <Root level="error">
-      <AppenderRef ref="Console"/>
-    </Root>
-  </Loggers>
-</configuration>
-

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties b/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
deleted file mode 100644
index 5798db4..0000000
--- a/metron-streaming/Metron-DataLoads/src/test/resources/logging.properties
+++ /dev/null
@@ -1,17 +0,0 @@
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-#  limitations under the License.
-
-.level= ERROR

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
index 81aeec2..894557c 100644
--- a/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
+++ b/metron-streaming/Metron-Elasticsearch/src/main/java/org/apache/metron/writer/ElasticsearchWriter.java
@@ -60,7 +60,7 @@ public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Seria
       builder.put(optionalSettings);
     }
     client = new TransportClient(builder.build())
-            .addTransportAddress(new InetSocketTransportAddress((String) globalConfiguration.get("es.ip"), (Integer) globalConfiguration.get("es.port")));
+            .addTransportAddress(new InetSocketTransportAddress(globalConfiguration.get("es.ip").toString(), Integer.parseInt(globalConfiguration.get("es.port").toString())));
     dateFormat = new SimpleDateFormat((String) globalConfiguration.get("es.date.format"));
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
index 4f26365..408be2d 100644
--- a/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Elasticsearch/src/test/java/org/apache/metron/integration/ElasticsearchEnrichmentIntegrationTest.java
@@ -34,7 +34,7 @@ import java.util.Properties;
 public class ElasticsearchEnrichmentIntegrationTest extends EnrichmentIntegrationTest {
 
   private String indexDir = "target/elasticsearch";
-  private String dateFormat = "yyyy.MM.dd.hh";
+  private String dateFormat = "yyyy.MM.dd.HH";
   private String index = "yaf_index_" + new SimpleDateFormat(dateFormat).format(new Date());
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
index 2e4e0cc..cd56270 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/integration/EnrichmentIntegrationTest.java
@@ -143,7 +143,7 @@ public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest {
   public void test() throws Exception {
     cleanHdfsDir(hdfsDir);
     final Configurations configurations = SampleUtil.getSampleConfigs();
-    final String dateFormat = "yyyy.MM.dd.hh";
+    final String dateFormat = "yyyy.MM.dd.HH";
     final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath);
     final String cf = "cf";
     final String trackerHBaseTableName = "tracker";

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
index b15e682..71da674 100644
--- a/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
+++ b/metron-streaming/Metron-Testing/src/main/java/org/apache/metron/util/SampleUtil.java
@@ -27,7 +27,8 @@ public class SampleUtil {
 
   public static final String sampleConfigRoot = "../Metron-Testing/src/main/resources/sample/config/";
 
-  public static Configurations getSampleConfigs() throws IOException {
+  public static
+  Configurations getSampleConfigs() throws IOException {
     Configurations configurations = new Configurations();
     configurations.updateGlobalConfig(ConfigurationsUtils.readGlobalConfigFromFile(sampleConfigRoot));
     Map<String, byte[]> sensorEnrichmentConfigs = ConfigurationsUtils.readSensorEnrichmentConfigsFromFile(sampleConfigRoot);

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
----------------------------------------------------------------------
diff --git a/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json b/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
index b8b449b..721f70f 100644
--- a/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
+++ b/metron-streaming/Metron-Testing/src/main/resources/sample/config/global.json
@@ -2,7 +2,7 @@
   "es.clustername": "metron",
   "es.ip": "localhost",
   "es.port": 9300,
-  "es.date.format": "yyyy.MM.dd.hh",
+  "es.date.format": "yyyy.MM.dd.HH",
   "solr.zookeeper": "localhost:2181",
   "solr.collection": "metron",
   "solr.numShards": 1,

http://git-wip-us.apache.org/repos/asf/incubator-metron/blob/4931f225/metron-streaming/pom.xml
----------------------------------------------------------------------
diff --git a/metron-streaming/pom.xml b/metron-streaming/pom.xml
index 2b40883..ecd5994 100644
--- a/metron-streaming/pom.xml
+++ b/metron-streaming/pom.xml
@@ -21,13 +21,13 @@
 	<packaging>pom</packaging>
 	<name>Metron-Streaming</name>
 	<description>Stream analytics for Metron</description>
-  <url>https://metron.incubator.apache.org/</url>
-  <scm>
-    <connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</connection>
-    <developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</developerConnection>
-    <tag>HEAD</tag>
-    <url>https://git-wip-us.apache.org/repos/asf/incubator-metron</url>
-  </scm>
+	<url>https://metron.incubator.apache.org/</url>
+	<scm>
+		<connection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</connection>
+		<developerConnection>scm:git:https://git-wip-us.apache.org/repos/asf/incubator-metron.git</developerConnection>
+		<tag>HEAD</tag>
+		<url>https://git-wip-us.apache.org/repos/asf/incubator-metron</url>
+	</scm>
 	<properties>
 		<twitter>@ApacheMetron</twitter>
 		<global_opencsv_version>3.7</global_opencsv_version>
@@ -48,7 +48,7 @@
 		<global_slf4j_version>1.7.7</global_slf4j_version>
 		<global_opencsv_version>3.7</global_opencsv_version>
 		<global_solr_version>5.2.1</global_solr_version>
-		<global_mockito_version>1.10.16</global_mockito_version>
+		<global_mockito_version>1.10.19</global_mockito_version>
 	</properties>
 	<licenses>
 		<license>
@@ -82,6 +82,15 @@
 		<module>Metron-Testing</module>
 		<module>Metron-TestingUtilities</module>
 	</modules>
+	<dependencyManagement>
+		<dependencies>
+			<dependency>
+				<groupId>org.mockito</groupId>
+				<artifactId>mockito-core</artifactId>
+				<version>${global_mockito_version}</version>
+			</dependency>
+		</dependencies>
+	</dependencyManagement>
 	<dependencies>
 		<dependency>
 			<groupId>junit</groupId>
@@ -94,31 +103,83 @@
 			<artifactId>multiline-string</artifactId>
 			<version>0.1.2</version>
 			<scope>test</scope>
-
 		</dependency>
 	</dependencies>
 	<build>
-    <plugins>
-        <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-checkstyle-plugin</artifactId>
-        <version>2.17</version>
-        <executions>
-          <execution>
-            <id>checkstyle</id>
-            <phase>package</phase>
-            <goals>
-              <goal>check</goal>
-            </goals>
-          </execution>
-        </executions>
-        <configuration>
-          <configLocation>style/checkstyle.xml</configLocation>
-          <headerLocation>style/LICENSE</headerLocation>
-          <failOnViolation>true</failOnViolation>
-	   <includeTestSourceDirectory>true</includeTestSourceDirectory>
-        </configuration>
-      </plugin>
+		<pluginManagement>
+			<plugins>
+				<plugin>
+					<!-- Separates the unit tests from the integration tests. -->
+					<groupId>org.apache.maven.plugins</groupId>
+					<artifactId>maven-surefire-plugin</artifactId>
+					<version>2.18</version>
+					<configuration>
+						<!-- Skip the default running of this plug-in (or everything is run twice...see below) -->
+						<argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+						<skip>true</skip>
+						<!-- Show 100% of the lines from the stack trace (doesn't work) -->
+						<trimStackTrace>false</trimStackTrace>
+					</configuration>
+					<executions>
+						<execution>
+							<id>unit-tests</id>
+							<phase>test</phase>
+							<goals>
+								<goal>test</goal>
+							</goals>
+							<configuration>
+								<!-- Never skip running the tests when the test phase is invoked -->
+								<skip>false</skip>
+								<includes>
+									<!-- Include unit tests within integration-test phase. -->
+									<include>**/*Test.java</include>
+								</includes>
+								<excludes>
+									<!-- Exclude integration tests within (unit) test phase. -->
+									<exclude>**/*IntegrationTest.java</exclude>
+								</excludes>
+							</configuration>
+						</execution>
+						<execution>
+							<id>integration-tests</id>
+							<phase>integration-test</phase>
+							<goals>
+								<goal>test</goal>
+							</goals>
+							<configuration>
+								<!-- Never skip running the tests when the integration-test phase is invoked -->
+								<skip>false</skip>
+								<includes>
+									<!-- Include integration tests within integration-test phase. -->
+									<include>**/*IntegrationTest.java</include>
+								</includes>
+							</configuration>
+						</execution>
+					</executions>
+				</plugin>
+			</plugins>
+		</pluginManagement>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-checkstyle-plugin</artifactId>
+				<version>2.17</version>
+				<executions>
+					<execution>
+						<id>checkstyle</id>
+						<phase>package</phase>
+						<goals>
+							<goal>check</goal>
+						</goals>
+					</execution>
+				</executions>
+				<configuration>
+					<configLocation>style/checkstyle.xml</configLocation>
+					<headerLocation>style/LICENSE</headerLocation>
+					<failOnViolation>true</failOnViolation>
+					<includeTestSourceDirectory>true</includeTestSourceDirectory>
+				</configuration>
+			</plugin>
 			<plugin>
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-compiler-plugin</artifactId>
@@ -165,8 +226,8 @@
 				<groupId>org.apache.maven.plugins</groupId>
 				<artifactId>maven-surefire-plugin</artifactId>
 				<version>2.18</version>
-        <configuration>
-          <argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
+				<configuration>
+					<argLine>-Xmx2048m -XX:MaxPermSize=256m</argLine>
 					<systemProperties>
 						<property>
 							<name>mode</name>
@@ -200,18 +261,17 @@
 			</plugin>
 		</plugins>
 	</reporting>
-    <repositories>
-        <repository>
-            <id>clojars.org</id>
-            <url>http://clojars.org/repo</url>
-        </repository>
-        <repository>
-            <id>multiline-release-repo</id>
-            <url>https://raw.github.com/benelog/multiline/master/maven-repository</url>
-            <snapshots>
-                <enabled>false</enabled>
-            </snapshots>
-        </repository>
-
+	<repositories>
+		<repository>
+			<id>clojars.org</id>
+			<url>http://clojars.org/repo</url>
+		</repository>
+		<repository>
+			<id>multiline-release-repo</id>
+			<url>https://raw.github.com/benelog/multiline/master/maven-repository</url>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
+		</repository>
     </repositories>
 </project>