You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ot...@apache.org on 2018/04/18 15:00:06 UTC

[36/52] [abbrv] metron git commit: METRON-1465:Support for Elasticsearch X-pack (wardbekker via mmiklavc) closes apache/metron#946

METRON-1465:Support for Elasticsearch X-pack (wardbekker via mmiklavc) closes apache/metron#946


Project: http://git-wip-us.apache.org/repos/asf/metron/repo
Commit: http://git-wip-us.apache.org/repos/asf/metron/commit/a8b555dc
Tree: http://git-wip-us.apache.org/repos/asf/metron/tree/a8b555dc
Diff: http://git-wip-us.apache.org/repos/asf/metron/diff/a8b555dc

Branch: refs/heads/feature/METRON-1211-extensions-parsers-gradual
Commit: a8b555dcc9f548d7b91789a46d9435b4d8b17581
Parents: 3ba9ae2
Author: wardbekker <wa...@wardbekker.com>
Authored: Mon Apr 9 13:14:13 2018 -0600
Committer: Michael Miklavcic <mi...@gmail.com>
Committed: Mon Apr 9 13:14:13 2018 -0600

----------------------------------------------------------------------
 metron-deployment/Kerberos-manual-setup.md      | 209 +++++++++++++++++++
 .../roles/metron-builder/tasks/build-debs.yml   |   2 +-
 .../roles/metron-builder/tasks/build-rpms.yml   |   2 +-
 .../METRON/CURRENT/configuration/metron-env.xml |   2 -
 .../metron-rest/src/main/scripts/metron-rest.sh |   9 +
 .../src/main/config/zookeeper/global.json       |   5 +-
 .../apache/metron/common/utils/HDFSUtils.java   |  59 ++++++
 .../metron/common/utils/ReflectionUtils.java    |  66 +++++-
 .../elasticsearch/dao/ElasticsearchDao.java     |  33 ++-
 .../elasticsearch/utils/ElasticsearchUtils.java | 107 ++++++++--
 .../writer/ElasticsearchWriter.java             |   8 +-
 .../scripts/start_elasticsearch_topology.sh     |   8 +-
 .../writer/ElasticsearchWriterTest.java         |  19 +-
 .../stellar/common/utils/ConversionUtils.java   |  19 +-
 14 files changed, 486 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/Kerberos-manual-setup.md
----------------------------------------------------------------------
diff --git a/metron-deployment/Kerberos-manual-setup.md b/metron-deployment/Kerberos-manual-setup.md
index 47a63d8..456703a 100644
--- a/metron-deployment/Kerberos-manual-setup.md
+++ b/metron-deployment/Kerberos-manual-setup.md
@@ -30,6 +30,7 @@ This document provides instructions for kerberizing Metron's Vagrant-based devel
 * [Start Metron](#start-metron)
 * [Push Data](#push-data)
 * [More Information](#more-information)
+* [Elasticseach X-Pack](#x-pack)
 
 Setup
 -----
@@ -533,3 +534,211 @@ In order to correct this, you should:
 ### References
 
 * [https://github.com/apache/storm/blob/master/SECURITY.md](https://github.com/apache/storm/blob/master/SECURITY.md)
+
+X-Pack
+------
+
+First, stop the random_access_indexing topology through the Storm UI or from the CLI, e.g.
+
+```
+storm kill random_access_indexing
+```
+
+Here are instructions for enabling X-Pack with Elasticsearch and Kibana: https://www.elastic.co/guide/en/x-pack/5.6/installing-xpack.html
+
+You need to be sure to add the appropriate username and password for Elasticsearch and Kibana to enable external connections from Metron components. e.g. the following will create a user "transport_client_user" with password "changeme" and "superuser" credentials.
+
+```
+sudo /usr/share/elasticsearch/bin/x-pack/users useradd transport_client_user -p changeme -r superuser
+```
+
+Once you've picked a password to connect to ES, you need to upload a 1-line file to HDFS with that password in it. Metron will use this file to securely read the password in order to connect to ES securely.
+
+Here is an example using "changeme" as the password
+
+```
+echo changeme > /tmp/xpack-password
+sudo -u hdfs hdfs dfs -mkdir /apps/metron/elasticsearch/
+sudo -u hdfs hdfs dfs -put /tmp/xpack-password /apps/metron/elasticsearch/
+sudo -u hdfs hdfs dfs -chown metron:metron /apps/metron/elasticsearch/xpack-password
+```
+
+New settings have been added to configure the Elasticsearch client. By default the client will run as the normal ES prebuilt transport client. If you enable X-Pack you should set the es.client.class as shown below.
+
+Add the es settings to global.json
+
+```
+/usr/metron/0.4.3/config/zookeeper/global.json ->
+
+  "es.client.settings" : {
+      "es.client.class" : "org.elasticsearch.xpack.client.PreBuiltXPackTransportClient",
+      "es.xpack.username" : "transport_client_user",
+      "es.xpack.password.file" : "/apps/metron/elasticsearch/xpack-password"
+  }
+```
+
+Submit the update to Zookeeper
+
+```
+$METRON_HOME/bin/zk_load_configs.sh -m PUSH -i METRON_HOME/config/zookeeper/ -z $ZOOKEEPER
+```
+
+The last step before restarting the topology is to create a custom X-Pack shaded and relocated jar. This is up to you because of licensing restrictions, but here is a sample Maven pom file that should help.
+
+```
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software
+	Foundation (ASF) under one or more contributor license agreements. See the
+	NOTICE file distributed with this work for additional information regarding
+	copyright ownership. The ASF licenses this file to You under the Apache License,
+	Version 2.0 (the "License"); you may not use this file except in compliance
+	with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
+	Unless required by applicable law or agreed to in writing, software distributed
+	under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES
+	OR CONDITIONS OF ANY KIND, either express or implied. See the License for
+  the specific language governing permissions and limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <groupId>org.elasticsearch</groupId>
+    <artifactId>elasticsearch-xpack-shaded</artifactId>
+    <name>elasticsearch-xpack-shaded</name>
+    <packaging>jar</packaging>
+    <version>5.6.2</version>
+    <repositories>
+        <repository>
+            <id>elasticsearch-releases</id>
+            <url>https://artifacts.elastic.co/maven</url>
+            <releases>
+                <enabled>true</enabled>
+            </releases>
+            <snapshots>
+                <enabled>false</enabled>
+            </snapshots>
+        </repository>
+    </repositories>
+    <dependencies>
+        <dependency>
+            <groupId>org.elasticsearch.client</groupId>
+            <artifactId>x-pack-transport</artifactId>
+            <version>5.6.2</version>
+            <exclusions>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-smile</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-yaml</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.dataformat</groupId>
+                <artifactId>jackson-dataformat-cbor</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-core</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-api</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>org.slf4j</groupId>
+                <artifactId>slf4j-log4j12</artifactId>
+              </exclusion>
+              <exclusion>
+                <groupId>log4j</groupId>
+                <artifactId>log4j</artifactId>
+              </exclusion>
+              <exclusion> <!-- this is causing a weird build error if not excluded - Error creating shaded jar: null: IllegalArgumentException -->
+                    <groupId>org.apache.logging.log4j</groupId>
+                    <artifactId>log4j-api</artifactId>
+                </exclusion>
+            </exclusions>
+          </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.3</version>
+                <configuration>
+                    <createDependencyReducedPom>true</createDependencyReducedPom>
+                </configuration>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                          <filters>
+                            <filter>
+                              <artifact>*:*</artifact>
+                              <excludes>
+                                <exclude>META-INF/*.SF</exclude>
+                                <exclude>META-INF/*.DSA</exclude>
+                                <exclude>META-INF/*.RSA</exclude>
+                              </excludes>
+                            </filter>
+                          </filters>
+                          <relocations>
+				<relocation>
+                                    <pattern>io.netty</pattern>
+                                    <shadedPattern>org.apache.metron.io.netty</shadedPattern>
+                                </relocation>
+                                <relocation>
+                                    <pattern>org.apache.logging.log4j</pattern>
+                                    <shadedPattern>org.apache.metron.logging.log4j</shadedPattern>
+                                </relocation>
+                            </relocations>
+                            <artifactSet>
+                                <excludes>
+                                    <exclude>org.slf4j.impl*</exclude>
+                                    <exclude>org.slf4j:slf4j-log4j*</exclude>
+                                </excludes>
+                            </artifactSet>
+                            <transformers>
+                                <transformer
+                                  implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
+                                     <resources>
+                                        <resource>.yaml</resource>
+                                        <resource>LICENSE.txt</resource>
+                                        <resource>ASL2.0</resource>
+                                        <resource>NOTICE.txt</resource>
+                                      </resources>
+                                </transformer>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                                <transformer
+                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass></mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
+```
+
+Once you've built the elasticsearch-xpack-shaded-5.6.2.jar, it needs to be made available to Storm when you submit the topology. Create a contrib directory for indexing and put the jar file in this directory.
+
+```
+/usr/metron/0.4.3/indexing_contrib/elasticsearch-xpack-shaded-5.6.2.jar
+```
+
+Now you can restart the Elasticsearch topology. Note, you should perform this step manually, as follows.
+
+```
+$METRON_HOME/bin/start_elasticsearch_topology.sh
+```
+
+Once you've performed these steps, you shoud be able to start seeing data in your ES indexes.

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml b/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml
index 4949196..01ab565 100644
--- a/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml
+++ b/metron-deployment/ansible/roles/metron-builder/tasks/build-debs.yml
@@ -20,7 +20,7 @@
   args:
     chdir: "{{ metron_build_dir }}/metron-deployment"
   with_items:
-    - mvn package -DskipTests -Pbuild-debs
+    - mvn package -DskipTests -Pbuild-debs -T 2C
   become: false
   run_once: true
   delegate_to: localhost

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml
----------------------------------------------------------------------
diff --git a/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml b/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml
index c362fc2..7a5b6bd 100644
--- a/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml
+++ b/metron-deployment/ansible/roles/metron-builder/tasks/build-rpms.yml
@@ -20,7 +20,7 @@
   args:
     chdir: "{{ metron_build_dir }}/metron-deployment"
   with_items:
-    - mvn package -DskipTests -Pbuild-rpms
+    - mvn package -DskipTests -Pbuild-rpms -T 2C
   become: false
   run_once: true
   delegate_to: localhost

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
----------------------------------------------------------------------
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
index 87866e8..5c49799 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-env.xml
@@ -137,6 +137,4 @@
         <value>yyyy.MM.dd.HH</value>
         <display-name>Elasticsearch Date Format</display-name>
     </property>
-
-
 </configuration>

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
----------------------------------------------------------------------
diff --git a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
index f9a2b69..c293566 100644
--- a/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
+++ b/metron-interface/metron-rest/src/main/scripts/metron-rest.sh
@@ -36,6 +36,7 @@ METRON_SYSCONFIG="${METRON_SYSCONFIG:-/etc/default/metron}"
 METRON_LOG_DIR="${METRON_LOG_DIR:-/var/log/metron}"
 METRON_PID_FILE="${METRON_PID_FILE:-/var/run/metron/metron-rest.pid}"
 PARSER_CONTRIB=${PARSER_CONTRIB:-$METRON_HOME/parser_contrib}
+INDEXING_CONTRIB=${INDEXING_CONTRIB:-$METRON_HOME/indexing_contrib}
 PARSER_LIB=$(find $METRON_HOME/lib/ -name metron-parsers*.jar)
 
 echo "METRON_VERSION=${METRON_VERSION}"
@@ -65,6 +66,14 @@ if [ -d "$PARSER_CONTRIB" ]; then
   METRON_REST_CLASSPATH+=":${contrib_classpath}"
 fi
 
+if [ -d "$INDEXING_CONTRIB" ]; then
+  contrib_jar_pattern="${INDEXING_CONTRIB}/*.jar"
+  contrib_list=( $contrib_jar_pattern ) # expand the glob to a list
+  contrib_classpath=$(join_by : "${contrib_list[@]}") #join the list by a colon
+  echo "Indexing Contrib jars are: $contrib_classpath"
+  METRON_REST_CLASSPATH+=":${contrib_classpath}"
+fi
+
 echo "METRON_SPRING_PROFILES_ACTIVE=${METRON_SPRING_PROFILES_ACTIVE}"
 
 # the vagrant Spring profile provides configuration values, otherwise configuration is provided by rest_application.yml

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/config/zookeeper/global.json
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/config/zookeeper/global.json b/metron-platform/metron-common/src/main/config/zookeeper/global.json
index dc7e71f..9e5402e 100644
--- a/metron-platform/metron-common/src/main/config/zookeeper/global.json
+++ b/metron-platform/metron-common/src/main/config/zookeeper/global.json
@@ -4,5 +4,8 @@
   "es.date.format": "yyyy.MM.dd.HH",
   "parser.error.topic": "indexing",
   "update.hbase.table": "metron_update",
-  "update.hbase.cf": "t"
+  "update.hbase.cf": "t",
+  "es.client.settings": {
+    "client.transport.ping_timeout": "500s"
+  }
 }

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
new file mode 100644
index 0000000..ee00b7e
--- /dev/null
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/HDFSUtils.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.common.utils;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+public class HDFSUtils {
+
+  /**
+   * Reads full HDFS FS file contents into a List of Strings. Initializes file system with default
+   * configuration. Opens and closes the file system on each call. Never null.
+   *
+   * @param path path to file
+   * @return file contents as a String
+   * @throws IOException
+   */
+  public static List<String> readFile(String path) throws IOException {
+    return readFile(new Configuration(), path);
+  }
+
+  /**
+   * Reads full HDFS FS file contents into a String. Opens and closes the file system on each call.
+   * Never null.
+   *
+   * @param config Hadoop configuration
+   * @param path path to file
+   * @return file contents as a String
+   * @throws IOException
+   */
+  public static List<String> readFile(Configuration config, String path) throws IOException {
+    FileSystem fs = FileSystem.newInstance(config);
+    Path hdfsPath = new Path(path);
+    FSDataInputStream inputStream = fs.open(hdfsPath);
+    return IOUtils.readLines(inputStream, "UTF-8");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
index 144cdd9..ee6b041 100644
--- a/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
+++ b/metron-platform/metron-common/src/main/java/org/apache/metron/common/utils/ReflectionUtils.java
@@ -23,19 +23,62 @@ public class ReflectionUtils {
 
   public static <T> T createInstance(String className, T defaultClass) {
     T instance;
-    if(className == null || className.length() == 0 || className.charAt(0) == '$') {
+    if (className == null || className.length() == 0 || className.charAt(0) == '$') {
       return defaultClass;
-    }
-    else {
+    } else {
       instance = createInstance(className);
     }
     return instance;
   }
 
+  /**
+   * Attempts to create instance from specified class name. No-arg constructor assumed.
+   *
+   * @param className fully qualified name of class to instantiate. e.g. foo.bar.Baz
+   * @param <T> Instance created from passed class
+   * @return Object of type T
+   */
+  public static <T> T createInstance(String className) {
+    T instance;
+    try {
+      Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
+      instance = createInstance(clazz);
+    } catch (ClassNotFoundException e) {
+      throw new IllegalStateException("Unable to instantiate connector: class not found", e);
+    }
+    return instance;
+  }
+
+
+  /**
+   * Create instance from no-args constructor
+   *
+   * @param clazz Class to create instance from
+   * @param <T> Instance created from passed class
+   * @return Object of type T
+   */
   public static <T> T createInstance(Class<? extends T> clazz) {
+    return createInstance(clazz, null, null);
+  }
+
+  /**
+   * Create instance from passed class with specified parameter types and arguments. If parameter
+   * types is null, defaults to attempting to instantiate the no-arg constructor.
+   *
+   * @param clazz Class to create instance from
+   * @param parameterTypes parameter types to use for looking up the desired constructor
+   * @param parameters arguments to pass into the constructor when instantiating the object.
+   * @param <T> Instance created from passed class
+   * @return Object of type T
+   */
+  public static <T> T createInstance(Class<? extends T> clazz, Class<?>[] parameterTypes, Object[] parameters) {
     T instance;
     try {
-      instance = clazz.getConstructor().newInstance();
+      if (parameterTypes != null) {
+        instance = clazz.getConstructor(parameterTypes).newInstance(parameters);
+      } else {
+        instance = clazz.getConstructor().newInstance();
+      }
     } catch (InstantiationException e) {
       throw new IllegalStateException("Unable to instantiate connector.", e);
     } catch (IllegalAccessException e) {
@@ -47,11 +90,22 @@ public class ReflectionUtils {
     }
     return instance;
   }
-  public static <T> T createInstance(String className) {
+
+  /**
+   * Create instance from passed class name with specified parameter types and arguments. If parameter
+   * types is null, defaults to attempting to instantiate the no-arg constructor.
+   *
+   * @param clazz Class to create instance from
+   * @param parameterTypes parameter types to use for looking up the desired constructor
+   * @param parameters arguments to pass into the constructor when instantiating the object.
+   * @param <T> Instance created from passed class
+   * @return Object of type T
+   */
+  public static <T> T createInstance(String className, Class<?>[] parameterTypes, Object[] parameters) {
     T instance;
     try {
       Class<? extends T> clazz = (Class<? extends T>) Class.forName(className);
-      instance = createInstance(clazz);
+      instance = createInstance(clazz, parameterTypes, parameters);
     } catch (ClassNotFoundException e) {
       throw new IllegalStateException("Unable to instantiate connector: class not found", e);
     }

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
index 26e5731..cb5bb58 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/dao/ElasticsearchDao.java
@@ -17,8 +17,23 @@
  */
 package org.apache.metron.elasticsearch.dao;
 
+import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
+
 import com.google.common.base.Splitter;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
 import org.apache.metron.elasticsearch.utils.ElasticsearchUtils;
 import org.apache.metron.indexing.dao.AccessConfig;
 import org.apache.metron.indexing.dao.IndexDao;
@@ -65,22 +80,6 @@ import org.elasticsearch.search.sort.FieldSortBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.lang.invoke.MethodHandles;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.function.Function;
-
-import static org.apache.metron.elasticsearch.utils.ElasticsearchUtils.INDEX_NAME_DELIMITER;
-
 public class ElasticsearchDao implements IndexDao {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -391,7 +390,7 @@ public class ElasticsearchDao implements IndexDao {
   @Override
   public synchronized void init(AccessConfig config) {
     if(this.client == null) {
-      this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get(), config.getOptionalSettings());
+      this.client = ElasticsearchUtils.getClient(config.getGlobalConfigSupplier().get());
       this.accessConfig = config;
       this.columnMetadataDao = new ElasticsearchColumnMetadataDao(this.client.admin());
       this.requestSubmitter = new ElasticsearchRequestSubmitter(this.client);

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
index 4b73b84..24f7a27 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/utils/ElasticsearchUtils.java
@@ -22,6 +22,7 @@ import static java.lang.String.format;
 import com.google.common.base.Splitter;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import java.io.IOException;
 import java.lang.invoke.MethodHandles;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
@@ -30,25 +31,34 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import org.apache.commons.lang.StringUtils;
 import org.apache.metron.common.configuration.writer.WriterConfiguration;
+import org.apache.metron.common.utils.HDFSUtils;
+import org.apache.metron.common.utils.ReflectionUtils;
 import org.apache.metron.netty.utils.NettyRuntimeWrapper;
+import org.apache.metron.stellar.common.utils.ConversionUtils;
 import org.codehaus.jackson.map.ObjectMapper;
 import org.elasticsearch.client.transport.TransportClient;
 import org.elasticsearch.common.bytes.BytesReference;
 import org.elasticsearch.common.settings.Settings;
 import org.elasticsearch.common.transport.InetSocketTransportAddress;
 import org.elasticsearch.common.xcontent.XContentHelper;
-import org.elasticsearch.transport.client.PreBuiltTransportClient;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class ElasticsearchUtils {
 
   private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+  private static final String ES_CLIENT_CLASS_DEFAULT = "org.elasticsearch.transport.client.PreBuiltTransportClient";
+  private static final String PWD_FILE_CONFIG_KEY = "es.xpack.password.file";
+  private static final String USERNAME_CONFIG_KEY = "es.xpack.username";
+  private static final String TRANSPORT_CLIENT_USER_KEY = "xpack.security.user";
+
 
   private static ThreadLocal<Map<String, SimpleDateFormat>> DATE_FORMAT_CACHE
           = ThreadLocal.withInitial(() -> new HashMap<>());
@@ -107,32 +117,103 @@ public class ElasticsearchUtils {
     return parts[0];
   }
 
-  public static TransportClient getClient(Map<String, Object> globalConfiguration, Map<String, String> optionalSettings) {
+  /**
+   * Instantiates an Elasticsearch client based on es.client.class, if set. Defaults to
+   * org.elasticsearch.transport.client.PreBuiltTransportClient.
+   *
+   * @param globalConfiguration Metron global config
+   * @return
+   */
+  public static TransportClient getClient(Map<String, Object> globalConfiguration) {
+    Set<String> customESSettings = new HashSet<>();
+    customESSettings.addAll(Arrays.asList("es.client.class", USERNAME_CONFIG_KEY, PWD_FILE_CONFIG_KEY));
     Settings.Builder settingsBuilder = Settings.builder();
-    settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
-    settingsBuilder.put("client.transport.ping_timeout","500s");
-    if (optionalSettings != null) {
-      settingsBuilder.put(optionalSettings);
+    Map<String, String> esSettings = getEsSettings(globalConfiguration);
+    for (Map.Entry<String, String> entry : esSettings.entrySet()) {
+      String key = entry.getKey();
+      String value = entry.getValue();
+      if (!customESSettings.contains(key)) {
+        settingsBuilder.put(key, value);
+      }
     }
-    Settings settings = settingsBuilder.build();
-    TransportClient client;
-    try{
+    settingsBuilder.put("cluster.name", globalConfiguration.get("es.clustername"));
+    settingsBuilder.put("client.transport.ping_timeout", esSettings.getOrDefault("client.transport.ping_timeout","500s"));
+    setXPackSecurityOrNone(settingsBuilder, esSettings);
+
+    try {
       LOG.info("Number of available processors in Netty: {}", NettyRuntimeWrapper.availableProcessors());
       // Netty sets available processors statically and if an attempt is made to set it more than
       // once an IllegalStateException is thrown by NettyRuntime.setAvailableProcessors(NettyRuntime.java:87)
       // https://discuss.elastic.co/t/getting-availableprocessors-is-already-set-to-1-rejecting-1-illegalstateexception-exception/103082
       // https://discuss.elastic.co/t/elasticsearch-5-4-1-availableprocessors-is-already-set/88036
       System.setProperty("es.set.netty.runtime.available.processors", "false");
-      client = new PreBuiltTransportClient(settings);
-      for(HostnamePort hp : getIps(globalConfiguration)) {
+      TransportClient client = createTransportClient(settingsBuilder.build(), esSettings);
+      for (HostnamePort hp : getIps(globalConfiguration)) {
         client.addTransportAddress(
                 new InetSocketTransportAddress(InetAddress.getByName(hp.hostname), hp.port)
         );
       }
-    } catch (UnknownHostException exception){
+      return client;
+    } catch (UnknownHostException exception) {
       throw new RuntimeException(exception);
     }
-    return client;
+  }
+
+  private static Map<String, String> getEsSettings(Map<String, Object> config) {
+    return ConversionUtils
+        .convertMap((Map<String, Object>) config.getOrDefault("es.client.settings", new HashMap<String, Object>()),
+            String.class);
+  }
+
+  /*
+   * Append Xpack security settings (if any)
+   */
+  private static void setXPackSecurityOrNone(Settings.Builder settingsBuilder, Map<String, String> esSettings) {
+
+    if (esSettings.containsKey(PWD_FILE_CONFIG_KEY)) {
+
+      if (!esSettings.containsKey(USERNAME_CONFIG_KEY) || StringUtils.isEmpty(esSettings.get(USERNAME_CONFIG_KEY))) {
+        throw new IllegalArgumentException("X-pack username is required and cannot be empty");
+      }
+
+      settingsBuilder.put(
+         TRANSPORT_CLIENT_USER_KEY,
+         esSettings.get(USERNAME_CONFIG_KEY) + ":" + getPasswordFromFile(esSettings.get(PWD_FILE_CONFIG_KEY))
+      );
+    }
+  }
+
+  /*
+   * Single password on first line
+   */
+  private static String getPasswordFromFile(String hdfsPath) {
+    List<String> lines = null;
+    try {
+      lines = HDFSUtils.readFile(hdfsPath);
+    } catch (IOException e) {
+      throw new IllegalArgumentException(
+          format("Unable to read XPack password file from HDFS location '%s'", hdfsPath), e);
+    }
+    if (lines.size() == 0) {
+      throw new IllegalArgumentException(format("No password found in file '%s'", hdfsPath));
+    }
+    return lines.get(0);
+  }
+
+  /**
+   * Constructs ES transport client from the provided ES settings additional es config
+   *
+   * @param settings client settings
+   * @param esSettings client type to instantiate
+   * @return client with provided settings
+   */
+  private static TransportClient createTransportClient(Settings settings,
+      Map<String, String> esSettings) {
+    String esClientClassName = (String) esSettings
+        .getOrDefault("es.client.class", ES_CLIENT_CLASS_DEFAULT);
+    return ReflectionUtils
+        .createInstance(esClientClassName, new Class[]{Settings.class, Class[].class},
+            new Object[]{settings, new Class[0]});
   }
 
   public static class HostnamePort {

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
index 143bcf7..5959623 100644
--- a/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
+++ b/metron-platform/metron-elasticsearch/src/main/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriter.java
@@ -42,21 +42,15 @@ import org.slf4j.LoggerFactory;
 
 public class ElasticsearchWriter implements BulkMessageWriter<JSONObject>, Serializable {
 
-  private Map<String, String> optionalSettings;
   private transient TransportClient client;
   private SimpleDateFormat dateFormat;
   private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchWriter.class);
   private FieldNameConverter fieldNameConverter = new ElasticsearchFieldNameConverter();
 
-  public ElasticsearchWriter withOptionalSettings(Map<String, String> optionalSettings) {
-    this.optionalSettings = optionalSettings;
-    return this;
-  }
-
   @Override
   public void init(Map stormConf, TopologyContext topologyContext, WriterConfiguration configurations) {
     Map<String, Object> globalConfiguration = configurations.getGlobalConfig();
-    client = ElasticsearchUtils.getClient(globalConfiguration, optionalSettings);
+    client = ElasticsearchUtils.getClient(globalConfiguration);
     dateFormat = ElasticsearchUtils.getIndexFormat(globalConfiguration);
   }
 

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
index 1b473e9..20ce23f 100755
--- a/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
+++ b/metron-platform/metron-elasticsearch/src/main/scripts/start_elasticsearch_topology.sh
@@ -19,4 +19,10 @@
 METRON_VERSION=${project.version}
 METRON_HOME=/usr/metron/$METRON_VERSION
 TOPOLOGY_JAR=${project.artifactId}-$METRON_VERSION-uber.jar
-storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties
+INDEXING_CONTRIB=${INDEXING_CONTRIB:-$METRON_HOME/indexing_contrib}
+if [ -d "$INDEXING_CONTRIB" ]; then
+  export EXTRA_JARS=$(ls -m $INDEXING_CONTRIB/*.jar | tr -d ' ' | tr -d '\n' | sed 's/\/\//\//g')
+  storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties --jars "$EXTRA_JARS"
+else
+  storm jar $METRON_HOME/lib/$TOPOLOGY_JAR org.apache.storm.flux.Flux --remote $METRON_HOME/flux/indexing/random_access/remote.yaml --filter $METRON_HOME/config/elasticsearch.properties
+fi

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
----------------------------------------------------------------------
diff --git a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
index 9aff560..6a3638b 100644
--- a/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
+++ b/metron-platform/metron-elasticsearch/src/test/java/org/apache/metron/elasticsearch/writer/ElasticsearchWriterTest.java
@@ -18,23 +18,20 @@
 
 package org.apache.metron.elasticsearch.writer;
 
-import org.apache.storm.tuple.Tuple;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
 import com.google.common.collect.ImmutableList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import org.apache.metron.common.writer.BulkWriterResponse;
+import org.apache.storm.tuple.Tuple;
 import org.elasticsearch.action.bulk.BulkItemResponse;
 import org.elasticsearch.action.bulk.BulkResponse;
 import org.junit.Test;
 
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.junit.Assert.*;
-
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
 public class ElasticsearchWriterTest {
     @Test
     public void testSingleSuccesses() throws Exception {

http://git-wip-us.apache.org/repos/asf/metron/blob/a8b555dc/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
----------------------------------------------------------------------
diff --git a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
index b53097f..783afae 100644
--- a/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
+++ b/metron-stellar/stellar-common/src/main/java/org/apache/metron/stellar/common/utils/ConversionUtils.java
@@ -19,11 +19,12 @@
 package org.apache.metron.stellar.common.utils;
 
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import java.util.List;
+import java.util.Map;
 import org.apache.commons.beanutils.BeanUtilsBean2;
 import org.apache.commons.beanutils.ConvertUtilsBean;
 
-import java.util.List;
-
 public class ConversionUtils {
   private static ThreadLocal<ConvertUtilsBean> UTILS_BEAN = new ThreadLocal<ConvertUtilsBean>() {
     @Override
@@ -55,4 +56,18 @@ public class ConversionUtils {
     return Lists.transform(from, s -> convert(s, clazz));
   }
 
+  /**
+   * Performs naive Map type conversion on values. Key types remain unchanged.
+   *
+   * @param from Source map
+   * @param clazz Class type to cast the Map values to
+   * @param <K> Map key type
+   * @param <V1> Source value type
+   * @param <V2> Desired value type
+   * @return New Map with the values cast to the desired type
+   */
+  public static <K, V1, V2> Map<K, V2> convertMap(Map<K, V1> from, Class<V2> clazz) {
+    return Maps.transformValues(from, s -> convert(s, clazz));
+  }
+
 }