You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/30 16:31:03 UTC

[metron] branch feature/METRON-2088-support-hdp-3.1 updated: METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes apache/metron#1523

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
     new b1ca4f3  METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes apache/metron#1523
b1ca4f3 is described below

commit b1ca4f31cee956d9a547af2c83aa2ead898ff9c0
Author: mmiklavc <mi...@gmail.com>
AuthorDate: Wed Oct 30 12:30:41 2019 -0400

    METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes apache/metron#1523
---
 dependencies_with_url.csv                          |  49 +-
 dev-utilities/build-utils/list_dependencies.sh     |   2 +-
 metron-analytics/metron-maas-service/pom.xml       |  16 +-
 .../metron/maas/service/ApplicationMaster.java     |   1 +
 .../service/callback/ContainerRequestListener.java |   4 +
 .../apache/metron/maas/submit/ModelSubmission.java |  38 +-
 .../metron/maas/service/MaasIntegrationTest.java   |  30 +-
 metron-analytics/metron-profiler-spark/pom.xml     |  27 +
 .../metron/rest/service/impl/GrokServiceImpl.java  |   4 +-
 metron-platform/metron-common/pom.xml              |   5 -
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 899 +++++++++++++++++++++
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    | 782 ++++++++++++++++++
 metron-platform/metron-hbase-server/pom.xml        |  28 +-
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 899 +++++++++++++++++++++
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    | 783 ++++++++++++++++++
 .../EnrichmentCoprocessorIntegrationTest.java      |   4 +-
 .../metron-hbase/metron-hbase-common/pom.xml       |   2 -
 .../FanOutOneBlockAsyncDFSOutputHelper.java        | 899 +++++++++++++++++++++
 .../FanOutOneBlockAsyncDFSOutputSaslHelper.java    | 783 ++++++++++++++++++
 .../metron-indexing/metron-indexing-common/pom.xml |  11 +-
 .../metron/spout/pcap/PartitionHDFSWriter.java     |   2 +-
 metron-stellar/stellar-common/pom.xml              |   2 +-
 pom.xml                                            |   1 +
 23 files changed, 5218 insertions(+), 53 deletions(-)

diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 623a021..59eb838 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -1,5 +1,6 @@
 com.jakewharton.fliptables:fliptables:jar:1.0.2:compile,Apache v2,https://github.com/JakeWharton/flip-tables
 org.jboss.aesh:aesh:jar:0.66.19:compile,Apache v2,https://github.com/aeshell/aesh
+org.objenesis:objenesis:jar:1.0:compile,Apache v2,http://objenesis.org/
 org.objenesis:objenesis:jar:1.2:compile,Apache v2,http://objenesis.org/
 org.objenesis:objenesis:jar:2.1:compile,Apache v2,http://objenesis.org/
 org.ow2.asm:asm:jar:4.1:compile,BSD,http://asm.ow2.org/
@@ -26,6 +27,7 @@ com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD license,http://code.
 com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD license,http://code.google.com/p/protobuf
 com.google.protobuf:protobuf-java:jar:3.1.0:compile,New BSD license,http://code.google.com/p/protobuf
 com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/
+com.jcraft:jsch:jar:0.1.54:compile,BSD,http://www.jcraft.com/jsch/
 com.jayway.jsonpath:json-path:jar:2.3.0:compile,Apache v2,https://github.com/json-path/JsonPath
 com.jayway.jsonpath:json-path:jar:2.4.0:compile,Apache v2,https://github.com/json-path/JsonPath
 net.minidev:accessors-smart:jar:1.2:compile,Apache v2,https://github.com/netplex/json-smart-v2
@@ -48,6 +50,7 @@ javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDD
 javax.mail:mail:jar:1.4.1:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
 javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
 javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api
+javax.ws.rs:jsr311-api:jar:1.1.1:compile,CDDL 1.1,https://github.com/javaee/jsr311
 javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
 javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/
 javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/
@@ -72,8 +75,12 @@ org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.
 org.clojure:clojure:jar:1.7.0:compile,Eclipse Public License 1.0,http://clojure.org/
 org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-jaxrs:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-jaxrs:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-xc:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
 org.codehaus.jackson:jackson-xc:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-mapper-asl:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-xc:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
 org.codehaus.janino:commons-compiler:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
 org.codehaus.janino:janino:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
 org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
@@ -101,11 +108,17 @@ org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
 oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html
 xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net
 asm:asm:jar:3.1:compile,BSD,http://asm.ow2.org/
+com.sun.jersey.contribs:jersey-guice:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey.contribs:jersey-guice:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-client:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey:jersey-client:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-core:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey:jersey-core:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-json:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey:jersey-json:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-server:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-servlet:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
 com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer
 javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com
 javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com
@@ -161,6 +174,7 @@ com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codah
 com.codahale.metrics:metrics-graphite:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
 com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile,BSD,https://github.com/EsotericSoftware/reflectasm
 com.fasterxml.jackson.core:jackson-annotations:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
+com.fasterxml.jackson.core:jackson-annotations:jar:2.7.0:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-annotations:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-annotations:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -169,6 +183,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.2.3:compile,ASLv2,http://wiki.fast
 com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
+com.fasterxml.jackson.core:jackson-core:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.9.2:compile,ASLv2,https://github.com/FasterXML/jackson-core
 com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
@@ -177,6 +192,7 @@ com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.
 com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.7.8:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.9.2:compile,ASLv2,http://github.com/FasterXML/jackson
 com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -196,10 +212,13 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,h
 com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.7.9:compile,ASLv2,https://github.com/FasterXML/jackson-modules-base
 com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.6.7.1:compile,ASLv2,https://github.com/FasterXML/jackson-module-scala
+com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
 com.fasterxml.woodstox:woodstox-core:jar:5.0.3:compile,ASLv2,https://github.com/FasterXML/woodstox
 com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
 com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
+com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-jaxrs-providers
+com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-jaxrs-providers
 com.google.code.gson:gson:jar:2.2.2:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
 com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
 com.google.code.gson:gson:jar:2.7:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
@@ -211,8 +230,11 @@ com.google.guava:guava:jar:14.0.1:compile,ASLv2,
 com.google.guava:guava:jar:16.0.1:compile,ASLv2,
 com.google.guava:guava:jar:17.0:compile,ASLv2,
 com.google.guava:guava:jar:18.0:compile,ASLv2,
-com.google.inject.extensions:guice-servlet:jar:3.0:compile,ASLv2,
-com.google.inject:guice:jar:3.0:compile,ASLv2,
+com.google.inject.extensions:guice-servlet:jar:3.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject.extensions:guice-servlet:jar:4.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject:guice:jar:3.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject:guice:jar:4.0:compile,ASLv2,https://github.com/google/guice
+com.google.re2j:re2j:jar:1.1:compile,BSD,https://github.com/google/re2j
 com.googlecode.disruptor:disruptor:jar:2.10.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/disruptor/
 com.lmax:disruptor:jar:3.3.0:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
 com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
@@ -234,6 +256,7 @@ commons-beanutils:commons-beanutils-core:jar:1.8.0:provided,ASLv2,http://commons
 commons-beanutils:commons-beanutils:jar:1.7.0:compile,ASLv2,
 commons-beanutils:commons-beanutils:jar:1.8.3:compile,ASLv2,http://commons.apache.org/beanutils/
 commons-beanutils:commons-beanutils:jar:1.9.2:compile,ASLv2,http://commons.apache.org/beanutils/
+commons-beanutils:commons-beanutils:jar:1.9.3:compile,ASLv2,http://commons.apache.org/beanutils/
 commons-cli:commons-cli:jar:1.2:compile,ASLv2,http://commons.apache.org/cli/
 commons-cli:commons-cli:jar:1.3.1:compile,ASLv2,http://commons.apache.org/proper/commons-cli/
 commons-codec:commons-codec:jar:1.10:compile,ASLv2,http://commons.apache.org/proper/commons-codec/
@@ -269,6 +292,7 @@ commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/
 commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/
 commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/
 commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/
+commons-net:commons-net:jar:3.6:compile,ASLv2,http://commons.apache.org/net/
 commons-pool:commons-pool:jar:1.5.4:compile,ASLv2,http://commons.apache.org/proper/commons-pool/
 commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/
 commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/
@@ -288,10 +312,11 @@ io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile,ASLv2,https://github.co
 io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
 io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
 io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
-io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,
-io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,
-io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,
-io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2,
+io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.0.52.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2,https://netty.io/
 io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/
 io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/
 io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/
@@ -312,7 +337,7 @@ net.jpountz.lz4:lz4:jar:1.3.0:compile,The Apache Software License, Version 2.0,h
 net.sf.py4j:py4j:jar:0.10.7:compile,,
 nl.jqno.equalsverifier:equalsverifier:jar:2.0.2:compile,The Apache Software License, Version 2.0,http://www.jqno.nl/equalsverifier
 org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
-org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
 org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
 org.codehaus.woodstox:woodstox-core-asl:jar:4.4.1:compile,ASLv2,http://woodstox.codehaus.org
 org.eclipse.jetty:jetty-http:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
@@ -390,6 +415,7 @@ org.springframework.ldap:spring-ldap-core:jar:2.3.2.RELEASE:compile,ASLv2,https:
 org.springframework.security:spring-security-ldap:jar:5.1.1.RELEASE:compile,ASLv2,https://spring.io/projects/spring-security
 org.tukaani:xz:jar:1.0:compile,Public Domain,http://tukaani.org/xz/java.html
 org.xerial.snappy:snappy-java:jar:1.0.4.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
+org.xerial.snappy:snappy-java:jar:1.0.5:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
 org.xerial.snappy:snappy-java:jar:1.1.1.7:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
 org.xerial.snappy:snappy-java:jar:1.1.2.6:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
 org.xerial.snappy:snappy-java:jar:1.1.7.1:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
@@ -465,7 +491,9 @@ de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/lic
 org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html
 org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex
 com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okhttp:okhttp:jar:2.7.5:compile,ASLv2,https://github.com/square/okhttp
 com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okio:okio:jar:1.6.0:compile,ASLv2,https://github.com/square/okhttp
 org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/
 net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
 org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
@@ -541,6 +569,7 @@ org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
 org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
 org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
 com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
+com.zaxxer:HikariCP-java7:jar:2.4.12:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
 org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
 com.github.palindromicity:simple-syslog:jar:0.0.3:compile,ASLv2,https://github.com/palindromicity/simple-syslog
 org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.14:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
@@ -550,6 +579,7 @@ com.vividsolutions:jts:jar:1.13:compile
 joda-time:joda-time:jar:2.10:compile
 org.elasticsearch:securesm:jar:1.2:compile
 com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile,ASLv2,http://stephenc.github.io/jcip-annotations/
+com.nimbusds:nimbus-jose-jwt:jar:4.41.1:compile,ASLv2,https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home
 com.nimbusds:nimbus-jose-jwt:jar:4.41.2:compile,ASLv2,https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home
 tomcat:jasper-compiler:jar:5.5.23:compile,ASLv2,https://tomcat.apache.org/
 org.danilopianini:gson-extras:jar:0.2.1:compile,ASLv2,https://github.com/DanySK/gson-extras
@@ -576,3 +606,8 @@ org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile,Eclipse Public Licens
 org.glassfish.jersey.media:jersey-media-jaxb:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
 org.glassfish.web:javax.servlet.jsp:jar:2.3.2:compile,Eclipse Public License 2.0,https://jersey.github.io/
 org.glassfish:javax.el:jar:3.0.1-b11:compile,Eclipse Public License 2.0,https://jersey.github.io/
+com.cedarsoftware:java-util:jar:1.9.0:compile,ASLv2,https://github.com/jdereg/java-util
+com.cedarsoftware:json-io:jar:2.5.1:compile,ASLv2,https://github.com/jdereg/json-io
+de.ruedigermoeller:fst:jar:2.50:compile,ASLv2,https://github.com/RuedigerMoeller/fast-serialization
+dnsjava:dnsjava:jar:2.1.7:compile,BSD 2-clause,https://github.com/dnsjava/dnsjava
+org.ehcache:ehcache:jar:3.3.1:compile,ASLv2,https://www.ehcache.org/
diff --git a/dev-utilities/build-utils/list_dependencies.sh b/dev-utilities/build-utils/list_dependencies.sh
index 4e81b7b..04f3fdd 100755
--- a/dev-utilities/build-utils/list_dependencies.sh
+++ b/dev-utilities/build-utils/list_dependencies.sh
@@ -16,4 +16,4 @@
 #  limitations under the License.
 #
 
-{ mvn dependency:list || { echo "ERROR:  Failed to run mvn dependency:list" ; exit 1 ; } ; mvn dependency:list -PHDP-2.5.0.0 || { echo "ERROR:  Failed to run mvn dependency:list -PHDP-2.5.0.0" ; exit 1 ; } ; } | grep "^\[INFO\]   " | awk '{print $2}' | grep -v "org.apache" | grep -v "test" | grep -v "provided" | grep -v "runtime" | grep -v ":system" |  sort | uniq
+{ mvn dependency:list || { echo "ERROR:  Failed to run mvn dependency:list" ; exit 1 ; } ; mvn dependency:list -PHDP-3.1 || { echo "ERROR:  Failed to run mvn dependency:list -PHDP-3.1" ; exit 1 ; } ; } | grep "^\[INFO\]   " | awk '{print $2}' | grep -v "org.apache" | grep -v "test" | grep -v "provided" | grep -v "runtime" | grep -v ":system" |  sort | uniq
diff --git a/metron-analytics/metron-maas-service/pom.xml b/metron-analytics/metron-maas-service/pom.xml
index 4e81be6..16560ef 100644
--- a/metron-analytics/metron-maas-service/pom.xml
+++ b/metron-analytics/metron-maas-service/pom.xml
@@ -28,7 +28,7 @@
     <slf4j.version>1.7.7</slf4j.version>
     <storm.hdfs.version>0.1.2</storm.hdfs.version>
     <guava.version>${global_guava_version}</guava.version>
-    <hadoop.version>2.7.1</hadoop.version>
+    <hadoop.version>${global_hadoop_version}</hadoop.version>
     <jackson.version>1.9.13</jackson.version>
   </properties>
   <dependencies>
@@ -87,12 +87,12 @@
     <dependency>
       <groupId>com.sun.jersey.contribs</groupId>
       <artifactId>jersey-guice</artifactId>
-      <version>1.9</version>
+      <version>1.19</version>
     </dependency>
     <dependency>
       <groupId>com.sun.jersey</groupId>
       <artifactId>jersey-client</artifactId>
-      <version>1.9</version>
+      <version>1.19</version>
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
@@ -308,6 +308,16 @@
                   </excludes>
                 </filter>
               </filters>
+              <relocations>
+                  <relocation>
+                      <pattern>com.google.common</pattern>
+                      <shadedPattern>org.apache.metron.guava.${guava.version}</shadedPattern>
+                  </relocation>
+                  <relocation>
+                      <pattern>com.google.thirdparty</pattern>
+                      <shadedPattern>org.apache.metron.guava.thirdparty.${guava.version}</shadedPattern>
+                  </relocation>
+              </relocations>
               <transformers>
                 <transformer
                   implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
index dbab00f..9a27fc0 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
@@ -533,6 +533,7 @@ public class ApplicationMaster {
         }
       }
       else if(request.getAction() == Action.REMOVE) {
+        LOG.info("Removing containers");
         listener.removeContainers(request.getNumInstances(), request);
       }
     }
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
index 0a957e3..daae495 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
@@ -66,6 +66,7 @@ public class ContainerRequestListener implements AMRMClientAsync.CallbackHandler
                         , ServiceDiscoverer serviceDiscoverer
                         )
   {
+    LOG.info("Initializing container request listener");
     this.nmClient = nmClient;
     this.amRMClient = amRMClient;
     this.serviceDiscoverer = serviceDiscoverer;
@@ -74,11 +75,14 @@ public class ContainerRequestListener implements AMRMClientAsync.CallbackHandler
 
 
   public void removeContainers(int number, ModelRequest request) {
+    LOG.info("Making request to remove container");
     int i = 0;
     for(Container c : state.getList(request)) {
       if(i < number) {
+        LOG.info(String.format("Making request to removing container id: %s, node: %s", c.getId(), c.getNodeId()));
         amRMClient.releaseAssignedContainer(c.getId());
         nmClient.stopContainerAsync(c.getId(), c.getNodeId());
+        LOG.info(String.format("Done making request to removing container id: %s, node: %s", c.getId(), c.getNodeId()));
       }
       else {
         break;
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
index fcae40a..0aac885 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
@@ -20,7 +20,22 @@ package org.apache.metron.maas.submit;
 import com.google.common.base.Joiner;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
-import org.apache.commons.cli.*;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
 import org.apache.commons.io.IOUtils;
 import org.apache.curator.RetryPolicy;
 import org.apache.curator.framework.CuratorFramework;
@@ -30,24 +45,17 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.maas.config.*;
+import org.apache.metron.maas.config.Action;
+import org.apache.metron.maas.config.MaaSConfig;
+import org.apache.metron.maas.config.Model;
+import org.apache.metron.maas.config.ModelEndpoint;
+import org.apache.metron.maas.config.ModelRequest;
 import org.apache.metron.maas.discovery.ServiceDiscoverer;
+import org.apache.metron.maas.queue.Queue;
+import org.apache.metron.maas.queue.ZKQueue;
 import org.apache.metron.maas.service.Constants;
 import org.apache.metron.maas.service.Log4jPropertyHelper;
 import org.apache.metron.maas.util.ConfigUtil;
-import org.apache.metron.maas.queue.Queue;
-import org.apache.metron.maas.queue.ZKQueue;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
 
 public class ModelSubmission {
   public enum ModelSubmissionOptions {
diff --git a/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java b/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
index 8752850..3288296 100644
--- a/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
+++ b/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
@@ -16,18 +16,23 @@
  * limitations under the License.
  */
 package org.apache.metron.maas.service;
-import java.io.*;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
 import java.net.HttpURLConnection;
 import java.net.InetAddress;
 import java.net.URL;
 import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.logging.Level;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.curator.RetryPolicy;
@@ -35,6 +40,7 @@ import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.Shell;
@@ -45,10 +51,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.metron.integration.ComponentRunner;
 import org.apache.metron.integration.components.YarnComponent;
 import org.apache.metron.integration.components.ZKServerComponent;
-import org.apache.metron.maas.discovery.ServiceDiscoverer;
 import org.apache.metron.maas.config.MaaSConfig;
 import org.apache.metron.maas.config.Model;
 import org.apache.metron.maas.config.ModelEndpoint;
+import org.apache.metron.maas.discovery.ServiceDiscoverer;
 import org.apache.metron.maas.queue.ZKQueue;
 import org.apache.metron.maas.submit.ModelSubmission;
 import org.apache.metron.maas.util.ConfigUtil;
@@ -71,6 +77,7 @@ public class MaasIntegrationTest {
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
     UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
+    UnitTestHelper.setLog4jLevel(org.apache.log4j.Level.ERROR);
     LOG.info("Starting up YARN cluster");
 
     zkServerComponent = new ZKServerComponent();
@@ -180,10 +187,12 @@ public class MaasIntegrationTest {
       List<ApplicationReport> apps = yarnClient.getApplications();
       if (apps.size() == 0 ) {
         Thread.sleep(10);
+        LOG.info("No YARN apps found yet, retrying.");
         continue;
       }
       ApplicationReport appReport = apps.get(0);
       if(appReport.getHost().equals("N/A")) {
+        LOG.info("YARN apps found but not ready yet, retrying.");
         Thread.sleep(10);
         continue;
       }
@@ -193,8 +202,10 @@ public class MaasIntegrationTest {
                       + appReport.getRpcPort() + "'.";
       if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
         verified = true;
+        LOG.info("Yarn app verified");
       }
       if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+        LOG.info("Yarn app state returned FINISHED");
         break;
       }
     }
@@ -223,14 +234,17 @@ public class MaasIntegrationTest {
           try {
             List<ModelEndpoint> endpoints = discoverer.getEndpoints(new Model("dummy", "1.0"));
             if (endpoints != null && endpoints.size() == 1) {
-              LOG.trace("Found endpoints: " + endpoints.get(0));
+              LOG.info("Found endpoints: " + endpoints.get(0));
               String output = makeRESTcall(new URL(endpoints.get(0).getEndpoint().getUrl() + "/echo/casey"));
               if (output.contains("casey")) {
                 passed = true;
                 break;
               }
+            } else {
+              LOG.info("Did not find endpoints, retrying");
             }
           } catch (Exception e) {
+            LOG.info("Rest call failed", e);
           }
           Thread.sleep(2000);
         }
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index da8356e..8a896cf 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -29,6 +29,27 @@
     </properties>
     <dependencies>
         <dependency>
+            <!-- prevents Spark from pulling in newer versions of jackson
+                (1.9.x) which conflict with metron's ${global_jackson_version} -->
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <!-- prevents Spark from pulling in newer versions of jackson
+                (1.9.x) which conflict with metron's ${global_jackson_version} -->
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-annotations</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
+            <!-- prevents Spark from pulling in newer versions of jackson
+                (1.9.x) which conflict with metron's ${global_jackson_version} -->
+            <groupId>com.fasterxml.jackson.module</groupId>
+            <artifactId>jackson-module-scala_2.11</artifactId>
+            <version>${global_jackson_version}</version>
+        </dependency>
+        <dependency>
           <groupId>org.apache.spark</groupId>
           <artifactId>spark-core_2.11</artifactId>
           <version>${global_spark_version}</version>
@@ -198,6 +219,12 @@
                                     <pattern>com.tdunning</pattern>
                                     <shadedPattern>org.apache.metron.tdunning</shadedPattern>
                                 </relocation>
+                                <!-- We have conflicts with Spark's version of Jackson. This avoids that issue by
+                                     by relocating the packages. -->
+                                <relocation>
+                                    <pattern>com.fasterxml.jackson</pattern>
+                                    <shadedPattern>org.apache.metron.jackson.${global_jackson_version}</shadedPattern>
+                                </relocation>
                             </relocations>
                             <artifactSet>
                                 <excludes>
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
index c43c661..a2ac4c6 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets;
 import oi.thekraken.grok.api.Grok;
 import oi.thekraken.grok.api.Match;
 import org.apache.commons.io.IOUtils;
-import org.apache.directory.api.util.Strings;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.metron.rest.RestException;
 import org.apache.metron.rest.model.GrokValidation;
@@ -68,7 +68,7 @@ public class GrokServiceImpl implements GrokService {
             if (grokValidation.getPatternLabel() == null) {
               throw new RestException("Pattern label is required");
             }
-            if (Strings.isEmpty(grokValidation.getStatement())) {
+            if (StringUtils.isEmpty(grokValidation.getStatement())) {
               throw new RestException("Grok statement is required");
             }
             Grok grok = new Grok();
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 5d6ddaa..b4114cc 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -348,11 +348,6 @@
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>commons-beanutils</groupId>
-            <artifactId>commons-beanutils</artifactId>
-            <version>1.8.3</version>
-        </dependency>
-        <dependency>
             <groupId>org.reflections</groupId>
             <artifactId>reflections</artifactId>
             <version>0.9.10</version>
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..1a4a218
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputHelper() {
+  }
+
+  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+  // use pooled allocator for performance.
+  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+  // copied from DFSPacket since it is package private.
+  public static final long HEART_BEAT_SEQNO = -1L;
+
+  // Timeouts for communicating with DataNode for streaming writes/reads
+  public static final int READ_TIMEOUT = 60 * 1000;
+
+  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+  // createPipelineAckStatusGetter for more details.
+  private interface PipelineAckStatusGetter {
+    Status get(PipelineAckProto ack);
+  }
+
+  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+  // we need to use reflection to set it.See createStorageTypeSetter for more details.
+  private interface StorageTypeSetter {
+    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+  }
+
+  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+  // hadoop 2.8 or later. See createBlockAdder for more details.
+  private interface BlockAdder {
+
+    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                          ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+            throws IOException;
+  }
+
+  private static final BlockAdder BLOCK_ADDER;
+
+  private interface LeaseManager {
+
+    void begin(DFSClient client, long inodeId);
+
+    void end(DFSClient client, long inodeId);
+  }
+
+  private static final LeaseManager LEASE_MANAGER;
+
+  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+  // isClientRunning is not public so we need to use reflection.
+  private interface DFSClientAdaptor {
+
+    boolean isClientRunning(DFSClient client);
+  }
+
+  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+  // helper class for convert protos.
+  private interface PBHelper {
+
+    ExtendedBlockProto convert(ExtendedBlock b);
+
+    TokenProto convert(Token<?> tok);
+  }
+
+  private static final PBHelper PB_HELPER;
+
+  // helper class for creating data checksum.
+  private interface ChecksumCreater {
+    DataChecksum createChecksum(DFSClient client);
+  }
+
+  private static final ChecksumCreater CHECKSUM_CREATER;
+
+  // helper class for creating files.
+  private interface FileCreator {
+    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+                                  String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+                                  short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+            throws Exception {
+      try {
+        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+                replication, blockSize, supportedVersions);
+      } catch (InvocationTargetException e) {
+        if (e.getCause() instanceof Exception) {
+          throw (Exception) e.getCause();
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
+      }
+    };
+
+    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+                        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+                        CryptoProtocolVersion[] supportedVersions) throws Exception;
+  }
+
+  private static final FileCreator FILE_CREATOR;
+
+  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+    isClientRunningMethod.setAccessible(true);
+    return new DFSClientAdaptor() {
+
+      @Override
+      public boolean isClientRunning(DFSClient client) {
+        try {
+          return (Boolean) isClientRunningMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+    Method beginFileLeaseMethod =
+            DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+    beginFileLeaseMethod.setAccessible(true);
+    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+    endFileLeaseMethod.setAccessible(true);
+    return new LeaseManager() {
+
+      @Override
+      public void begin(DFSClient client, long inodeId) {
+        try {
+          beginFileLeaseMethod.invoke(client, inodeId, null);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void end(DFSClient client, long inodeId) {
+        try {
+          endFileLeaseMethod.invoke(client, inodeId);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+          throws NoSuchMethodException {
+    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> ecnClass;
+    try {
+      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+              .asSubclass(Enum.class);
+    } catch (ClassNotFoundException e) {
+      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+    @SuppressWarnings("unchecked")
+    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+    Method combineHeaderMethod =
+            PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+    Method getStatusFromHeaderMethod =
+            PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          @SuppressWarnings("unchecked")
+          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+          Integer headerFlag;
+          if (flagList.isEmpty()) {
+            Status reply = (Status) getReplyMethod.invoke(ack, 0);
+            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+          } else {
+            headerFlag = flagList.get(0);
+          }
+          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+          throws NoSuchMethodException {
+    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          return (Status) getStatusMethod.invoke(ack, 0);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+          throws NoSuchMethodException {
+    try {
+      return createPipelineAckStatusGetter27();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("Can not get expected method " + e.getMessage() +
+              ", this usually because your Hadoop is pre 2.7.0, " +
+              "try the methods in Hadoop 2.6.x instead.");
+    }
+    return createPipelineAckStatusGetter26();
+  }
+
+  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+    Method setStorageTypeMethod =
+            OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+      builder.put(storageTypeProto.name(), storageTypeProto);
+    }
+    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+    return new StorageTypeSetter() {
+
+      @Override
+      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+        Object protoEnum = name2ProtoEnum.get(storageType.name());
+        try {
+          setStorageTypeMethod.invoke(builder, protoEnum);
+        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        return builder;
+      }
+    };
+  }
+
+  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+    for (Method method : ClientProtocol.class.getMethods()) {
+      if (method.getName().equals("addBlock")) {
+        Method addBlockMethod = method;
+        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+        if (paramTypes[paramTypes.length - 1] == String[].class) {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        } else {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes, null);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        }
+      }
+    }
+    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+  }
+
+  private static PBHelper createPBHelper() throws NoSuchMethodException {
+    Class<?> helperClass;
+    String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+    try {
+      helperClass = Class.forName(clazzName);
+    } catch (ClassNotFoundException e) {
+      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+              helperClass.toString() + " instead.");
+    }
+    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+    return new PBHelper() {
+
+      @Override
+      public ExtendedBlockProto convert(ExtendedBlock b) {
+        try {
+          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public TokenProto convert(Token<?> tok) {
+        try {
+          return (TokenProto) convertTokenMethod.invoke(null, tok);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    for (Method method : confClass.getMethods()) {
+      if (method.getName().equals("createChecksum")) {
+        Method createChecksumMethod = method;
+        return new ChecksumCreater() {
+
+          @Override
+          public DataChecksum createChecksum(DFSClient client) {
+            try {
+              return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+                      (Object) null);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      }
+    }
+    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+  }
+
+  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+    createChecksumMethod.setAccessible(true);
+    return new ChecksumCreater() {
+
+      @Override
+      public DataChecksum createChecksum(DFSClient client) {
+        try {
+          return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater()
+          throws NoSuchMethodException, ClassNotFoundException {
+    Method getConfMethod = DFSClient.class.getMethod("getConf");
+    try {
+      return createChecksumCreater28(getConfMethod,
+              Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+    }
+    return createChecksumCreater27(getConfMethod,
+            Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+  }
+
+  private static FileCreator createFileCreator3() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class, String.class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions, null);
+    };
+  }
+
+  private static FileCreator createFileCreator2() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions);
+    };
+  }
+
+  private static FileCreator createFileCreator() throws NoSuchMethodException {
+    try {
+      return createFileCreator3();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+    }
+    return createFileCreator2();
+  }
+
+  // cancel the processing if DFSClient is already closed.
+  static final class CancelOnClose implements CancelableProgressable {
+
+    private final DFSClient client;
+
+    public CancelOnClose(DFSClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public boolean progress() {
+      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+    }
+  }
+
+  static {
+    try {
+      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+      STORAGE_TYPE_SETTER = createStorageTypeSetter();
+      BLOCK_ADDER = createBlockAdder();
+      LEASE_MANAGER = createLeaseManager();
+      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+      PB_HELPER = createPBHelper();
+      CHECKSUM_CREATER = createChecksumCreater();
+      FILE_CREATOR = createFileCreator();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  static void beginFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.begin(client, inodeId);
+  }
+
+  static void endFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.end(client, inodeId);
+  }
+
+  static DataChecksum createChecksum(DFSClient client) {
+    return CHECKSUM_CREATER.createChecksum(client);
+  }
+
+  static Status getStatus(PipelineAckProto ack) {
+    return PIPELINE_ACK_STATUS_GETTER.get(ack);
+  }
+
+  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+                                                Promise<Channel> promise, int timeoutMs) {
+    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+            new ProtobufVarint32FrameDecoder(),
+            new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+            new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+              @Override
+              protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+                      throws Exception {
+                Status pipelineStatus = resp.getStatus();
+                if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+                  throw new IOException("datanode " + dnInfo + " is restarting");
+                }
+                String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+                if (resp.getStatus() != Status.SUCCESS) {
+                  if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+                    throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+                            resp.getMessage() + ", " + logInfo);
+                  } else {
+                    throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+                            ", status message " + resp.getMessage() + ", " + logInfo);
+                  }
+                }
+                // success
+                ChannelPipeline p = ctx.pipeline();
+                for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+                  // do not remove all handlers because we may have wrap or unwrap handlers at the header
+                  // of pipeline.
+                  if (handler instanceof IdleStateHandler) {
+                    break;
+                  }
+                }
+                // Disable auto read here. Enable it after we setup the streaming pipeline in
+                // FanOutOneBLockAsyncDFSOutput.
+                ctx.channel().config().setAutoRead(false);
+                promise.trySuccess(ctx.channel());
+              }
+
+              @Override
+              public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+                promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+              }
+
+              @Override
+              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+                if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+                  promise
+                          .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+                } else {
+                  super.userEventTriggered(ctx, evt);
+                }
+              }
+
+              @Override
+              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                promise.tryFailure(cause);
+              }
+            });
+  }
+
+  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+                                        OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+    int protoLen = proto.getSerializedSize();
+    ByteBuf buffer =
+            channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    buffer.writeByte(Op.WRITE_BLOCK.code);
+    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+    channel.writeAndFlush(buffer);
+  }
+
+  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                                 Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+                                 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+          throws IOException {
+    Promise<Void> saslPromise = channel.eventLoop().newPromise();
+    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+    saslPromise.addListener(new FutureListener<Void>() {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        if (future.isSuccess()) {
+          // setup response processing pipeline first, then send request.
+          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+        } else {
+          promise.tryFailure(future.cause());
+        }
+      }
+    });
+  }
+
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+                                                          String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+                                                          BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+                                                          Class<? extends Channel> channelClass) {
+    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+    boolean connectToDnViaHostname =
+            conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+    blockCopy.setNumBytes(locatedBlock.getBlockSize());
+    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+            .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+                    .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+            .setClientName(clientName).build();
+    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+            .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+            .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+            .setRequestedChecksum(checksumProto)
+            .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      DatanodeInfo dnInfo = datanodeInfos[i];
+      Enum<?> storageType = storageTypes[i];
+      Promise<Channel> promise = eventLoopGroup.next().newPromise();
+      futureList.add(promise);
+      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+      new Bootstrap().group(eventLoopGroup).channel(channelClass)
+              .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+        @Override
+        protected void initChannel(Channel ch) throws Exception {
+          // we need to get the remote address of the channel so we can only move on after
+          // channel connected. Leave an empty implementation here because netty does not allow
+          // a null handler.
+        }
+      }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+                    timeoutMs, client, locatedBlock.getBlockToken(), promise);
+          } else {
+            promise.tryFailure(future.cause());
+          }
+        }
+      });
+    }
+    return futureList;
+  }
+
+  /**
+   * Exception other than RemoteException thrown when calling create on namenode
+   */
+  public static class NameNodeException extends IOException {
+
+    private static final long serialVersionUID = 3143237406477095390L;
+
+    public NameNodeException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+                                                           boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                           EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    Configuration conf = dfs.getConf();
+    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+    ClientProtocol namenode = client.getNamenode();
+    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+            DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+    for (int retry = 0;; retry++) {
+      HdfsFileStatus stat;
+      try {
+        stat = FILE_CREATOR.create(namenode, src,
+                FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+                new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+                createParent, replication, blockSize, CryptoProtocolVersion.supported());
+      } catch (Exception e) {
+        if (e instanceof RemoteException) {
+          throw (RemoteException) e;
+        } else {
+          throw new NameNodeException(e);
+        }
+      }
+      beginFileLease(client, stat.getFileId());
+      boolean succ = false;
+      LocatedBlock locatedBlock = null;
+      List<Future<Channel>> futureList = null;
+      try {
+        DataChecksum summer = createChecksum(client);
+        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+                excludesNodes, stat.getFileId(), null);
+        List<Channel> datanodeList = new ArrayList<>();
+        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+                PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+        for (int i = 0, n = futureList.size(); i < n; i++) {
+          try {
+            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+          } catch (Exception e) {
+            // exclude the broken DN next time
+            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+            throw e;
+          }
+        }
+        Encryptor encryptor = createEncryptor(conf, stat, client);
+        FanOutOneBlockAsyncDFSOutput output =
+                new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+                        stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+        succ = true;
+        return output;
+      } catch (RemoteException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (shouldRetryCreate(e)) {
+          if (retry >= createMaxRetries) {
+            throw e.unwrapRemoteException();
+          }
+        } else {
+          throw e.unwrapRemoteException();
+        }
+      } catch (IOException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (retry >= createMaxRetries) {
+          throw e;
+        }
+        // overwrite the old broken file.
+        overwrite = true;
+        try {
+          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+        } catch (InterruptedException ie) {
+          throw new InterruptedIOException();
+        }
+      } finally {
+        if (!succ) {
+          if (futureList != null) {
+            for (Future<Channel> f : futureList) {
+              f.addListener(new FutureListener<Channel>() {
+
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                    future.getNow().close();
+                  }
+                }
+              });
+            }
+          }
+          endFileLease(client, stat.getFileId());
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+   * inside an {@link EventLoop}.
+   */
+  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+                                                          boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                          EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+              throws IOException, UnresolvedLinkException {
+        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+                blockSize, eventLoopGroup, channelClass);
+      }
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    }.resolve(dfs, f);
+  }
+
+  public static boolean shouldRetryCreate(RemoteException e) {
+    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+    // For exceptions other than this, we just throw it out. This is same with
+    // DFSOutputStream.newStreamForCreate.
+    return e.getClassName().endsWith("RetryStartFileException");
+  }
+
+  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+                           ExtendedBlock block, long fileId) {
+    for (int retry = 0;; retry++) {
+      try {
+        if (namenode.complete(src, clientName, block, fileId)) {
+          endFileLease(client, fileId);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " not finished, retry = " + retry);
+        }
+      } catch (RemoteException e) {
+        IOException ioe = e.unwrapRemoteException();
+        if (ioe instanceof LeaseExpiredException) {
+          LOG.warn("lease for file " + src + " is expired, give up", e);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+        }
+      } catch (Exception e) {
+        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+      }
+      sleepIgnoreInterrupt(retry);
+    }
+  }
+
+  static void sleepIgnoreInterrupt(int retry) {
+    try {
+      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+    } catch (InterruptedException e) {
+    }
+  }
+}
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..b4664d4
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+  }
+
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  private static final String NAME_DELIMITER = " ";
+
+  private interface SaslAdaptor {
+
+    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+  }
+
+  private static final SaslAdaptor SASL_ADAPTOR;
+
+  private interface TransparentCryptoHelper {
+
+    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+            throws IOException;
+  }
+
+  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+  private static SaslAdaptor createSaslAdaptor()
+          throws NoSuchFieldException, NoSuchMethodException {
+    Field saslPropsResolverField =
+            SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+    saslPropsResolverField.setAccessible(true);
+    Field trustedChannelResolverField =
+            SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    Field fallbackToSimpleAuthField =
+            SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+    fallbackToSimpleAuthField.setAccessible(true);
+    return new SaslAdaptor() {
+
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+        try {
+          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+          throws NoSuchMethodException {
+    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+            .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey =
+                  (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+          throws ClassNotFoundException, NoSuchMethodException {
+    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+            "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+                  .invoke(null, feInfo, client.getKeyProvider());
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelper()
+          throws NoSuchMethodException, ClassNotFoundException {
+    try {
+      return createTransparentCryptoHelperWithoutHDFS12396();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+              " should be hadoop version with HDFS-12396", e);
+    }
+    return createTransparentCryptoHelperWithHDFS12396();
+  }
+
+  static {
+    try {
+      SASL_ADAPTOR = createSaslAdaptor();
+      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please "
+              + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+              + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     * @param userName SASL user name
+     * @param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+    private final Configuration conf;
+
+    private final Map<String, String> saslProps;
+
+    private final SaslClient saslClient;
+
+    private final int timeoutMs;
+
+    private final Promise<Void> promise;
+
+    private final DFSClient dfsClient;
+
+    private int step = 0;
+
+    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+                                Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+                                DFSClient dfsClient) throws SaslException {
+      this.conf = conf;
+      this.saslProps = saslProps;
+      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+              SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+      this.timeoutMs = timeoutMs;
+      this.promise = promise;
+      this.dfsClient = dfsClient;
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+      sendSaslMessage(ctx, payload, null);
+    }
+
+    private List<CipherOption> getCipherOptions() throws IOException {
+      // Negotiate cipher suites if configured. Currently, the only supported
+      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+      // values for future expansion.
+      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+      if (StringUtils.isBlank(cipherSuites)) {
+        return null;
+      }
+      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+        throw new IOException(String.format("Invalid cipher suite, %s=%s",
+                DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+      }
+      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+                                 List<CipherOption> options) throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+              DataTransferEncryptorMessageProto.newBuilder();
+      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+      if (payload != null) {
+        // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+        // and we want to keep that out of hbase-server.
+        builder.setPayload(ByteString.copyFrom(payload));
+      }
+      if (options != null) {
+        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+      }
+      DataTransferEncryptorMessageProto proto = builder.build();
+      int size = proto.getSerializedSize();
+      size += CodedOutputStream.computeRawVarint32Size(size);
+      ByteBuf buf = ctx.alloc().buffer(size);
+      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+      ctx.write(buf);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      sendSaslMessage(ctx, new byte[0]);
+      ctx.flush();
+      step++;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+        dfsClient.clearDataEncryptionKey();
+        throw new InvalidEncryptionKeyException(proto.getMessage());
+      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+        throw new IOException(proto.getMessage());
+      }
+    }
+
+    private String getNegotiatedQop() {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    }
+
+    private boolean isNegotiatedQopPrivacy() {
+      String qop = getNegotiatedQop();
+      return qop != null && "auth-conf".equalsIgnoreCase(qop);
+    }
+
+    private boolean requestedQopContainsPrivacy() {
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      return requestedQop.contains("auth-conf");
+    }
+
+    private void checkSaslComplete() throws IOException {
+      if (!saslClient.isComplete()) {
+        throw new IOException("Failed to complete SASL handshake");
+      }
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      String negotiatedQop = getNegotiatedQop();
+      LOG.debug(
+              "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+      if (!requestedQop.contains(negotiatedQop)) {
+        throw new IOException(String.format("SASL handshake completed, but "
+                        + "channel does not have acceptable quality of protection, "
+                        + "requested = %s, negotiated = %s",
+                requestedQop, negotiatedQop));
+      }
+    }
+
+    private boolean useWrap() {
+      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      return qop != null && !"auth".equalsIgnoreCase(qop);
+    }
+
+    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = saslClient.unwrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = saslClient.unwrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+              option.getOutIv());
+    }
+
+    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+                                         boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+      List<CipherOption> cipherOptions =
+              PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+      if (cipherOptions == null || cipherOptions.isEmpty()) {
+        return null;
+      }
+      CipherOption cipherOption = cipherOptions.get(0);
+      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+      if (msg instanceof DataTransferEncryptorMessageProto) {
+        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+        check(proto);
+        byte[] challenge = proto.getPayload().toByteArray();
+        byte[] response = saslClient.evaluateChallenge(challenge);
+        switch (step) {
+          case 1: {
+            List<CipherOption> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = getCipherOptions();
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
+          }
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            CipherOption cipherOption =
+                    getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+                      new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient),
+                        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                        new SaslUnwrapHandler(saslClient));
+              }
+            }
+            promise.trySuccess(null);
+            break;
+          }
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+        }
+      } else {
+        ctx.fireChannelRead(msg);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      promise.tryFailure(cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+      } else {
+        super.userEventTriggered(ctx, evt);
+      }
+    }
+  }
+
+  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final SaslClient saslClient;
+
+    public SaslUnwrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      msg.skipBytes(4);
+      byte[] b = new byte[msg.readableBytes()];
+      msg.readBytes(b);
+      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+    }
+  }
+
+  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+    private final SaslClient saslClient;
+
+    private CompositeByteBuf cBuf;
+
+    public SaslWrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+            throws Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf buf = (ByteBuf) msg;
+        cBuf.addComponent(buf);
+        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+      } else {
+        ctx.write(msg);
+      }
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx) throws Exception {
+      if (cBuf.isReadable()) {
+        byte[] b = new byte[cBuf.readableBytes()];
+        cBuf.readBytes(b);
+        cBuf.discardReadComponents();
+        byte[] wrapped = saslClient.wrap(b, 0, b.length);
+        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+        buf.writeInt(wrapped.length);
+        buf.writeBytes(wrapped);
+        ctx.write(buf);
+      }
+      ctx.flush();
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+      cBuf.release();
+      cBuf = null;
+    }
+  }
+
+  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final Decryptor decryptor;
+
+    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.decryptor = codec.createDecryptor();
+      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+      decryptor.decrypt(inBuffer, outBuffer);
+      outBuf.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+      ctx.fireChannelRead(outBuf);
+    }
+  }
+
+  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+    private final Encryptor encryptor;
+
+    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.encryptor = codec.createEncryptor();
+      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+            throws Exception {
+      if (preferDirect) {
+        return ctx.alloc().directBuffer(msg.readableBytes());
+      } else {
+        return ctx.alloc().buffer(msg.readableBytes());
+      }
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+      encryptor.encrypt(inBuffer, outBuffer);
+      out.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+    }
+  }
+
+  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+            + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+  }
+
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+  }
+
+  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+  }
+
+  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+            .toCharArray();
+  }
+
+  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+                                        String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+                                        DFSClient dfsClient) {
+    try {
+      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+              new ProtobufVarint32FrameDecoder(),
+              new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+              new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+                      dfsClient));
+    } catch (SaslException e) {
+      saslPromise.tryFailure(e);
+    }
+  }
+
+  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                               int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+                               Promise<Void> saslPromise) throws IOException {
+    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+    TrustedChannelResolver trustedChannelResolver =
+            SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+      saslPromise.trySuccess(null);
+      return;
+    }
+    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+    if (encryptionKey != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+              encryptionKeyToPassword(encryptionKey.encryptionKey),
+              createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+              client);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+                + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (dnInfo.getXferPort() < 1024) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (saslPropsResolver != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+              buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+              client);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+      // edge case.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+                + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    }
+  }
+
+  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+          throws IOException {
+    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+    if (feInfo == null) {
+      return null;
+    }
+    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+  }
+}
diff --git a/metron-platform/metron-hbase-server/pom.xml b/metron-platform/metron-hbase-server/pom.xml
index 0f6bd83..120954e 100644
--- a/metron-platform/metron-hbase-server/pom.xml
+++ b/metron-platform/metron-hbase-server/pom.xml
@@ -34,6 +34,12 @@
         <guava.version>${global_hbase_guava_version}</guava.version>
     </properties>
 
+    <!--
+     Metron HBase server is meant to be deployed as a single uber jar. It's expected that
+     all runtime dependencies will be included in the jar or be provided by the environment
+     in which this jar will be run.
+     -->
+
     <dependencies>
 
         <!-- Metron -->
@@ -58,15 +64,33 @@
             </exclusions>
         </dependency>
         <dependency>
+            <!-- We depend on this for some Zookeeper upload operations -->
+            <groupId>org.apache.metron</groupId>
+            <artifactId>stellar-common</artifactId>
+            <version>${project.parent.version}</version>
+            <classifier>uber</classifier>
+        </dependency>
+        <dependency>
             <groupId>org.apache.metron</groupId>
             <artifactId>metron-enrichment-common</artifactId>
             <version>${project.parent.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
-            <artifactId>stellar-common</artifactId>
+            <artifactId>metron-hbase-common</artifactId>
             <version>${project.parent.version}</version>
-            <classifier>uber</classifier>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
 
         <!-- HBase -->
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..e82a46c
--- /dev/null
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(
+              FanOutOneBlockAsyncDFSOutputHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputHelper() {
+  }
+
+  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+  // use pooled allocator for performance.
+  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+  // copied from DFSPacket since it is package private.
+  public static final long HEART_BEAT_SEQNO = -1L;
+
+  // Timeouts for communicating with DataNode for streaming writes/reads
+  public static final int READ_TIMEOUT = 60 * 1000;
+
+  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+  // createPipelineAckStatusGetter for more details.
+  private interface PipelineAckStatusGetter {
+    Status get(PipelineAckProto ack);
+  }
+
+  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+  // we need to use reflection to set it.See createStorageTypeSetter for more details.
+  private interface StorageTypeSetter {
+    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+  }
+
+  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+  // hadoop 2.8 or later. See createBlockAdder for more details.
+  private interface BlockAdder {
+
+    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+        ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+            throws IOException;
+  }
+
+  private static final BlockAdder BLOCK_ADDER;
+
+  private interface LeaseManager {
+
+    void begin(DFSClient client, long inodeId);
+
+    void end(DFSClient client, long inodeId);
+  }
+
+  private static final LeaseManager LEASE_MANAGER;
+
+  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+  // isClientRunning is not public so we need to use reflection.
+  private interface DFSClientAdaptor {
+
+    boolean isClientRunning(DFSClient client);
+  }
+
+  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+  // helper class for convert protos.
+  private interface PBHelper {
+
+    ExtendedBlockProto convert(ExtendedBlock b);
+
+    TokenProto convert(Token<?> tok);
+  }
+
+  private static final PBHelper PB_HELPER;
+
+  // helper class for creating data checksum.
+  private interface ChecksumCreater {
+    DataChecksum createChecksum(DFSClient client);
+  }
+
+  private static final ChecksumCreater CHECKSUM_CREATER;
+
+  // helper class for creating files.
+  private interface FileCreator {
+    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+        String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+        short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+            throws Exception {
+      try {
+        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+                replication, blockSize, supportedVersions);
+      } catch (InvocationTargetException e) {
+        if (e.getCause() instanceof Exception) {
+          throw (Exception) e.getCause();
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
+      }
+    };
+
+    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+        CryptoProtocolVersion[] supportedVersions) throws Exception;
+  }
+
+  private static final FileCreator FILE_CREATOR;
+
+  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+    isClientRunningMethod.setAccessible(true);
+    return new DFSClientAdaptor() {
+
+      @Override
+      public boolean isClientRunning(DFSClient client) {
+        try {
+          return (Boolean) isClientRunningMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+    Method beginFileLeaseMethod =
+            DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+    beginFileLeaseMethod.setAccessible(true);
+    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+    endFileLeaseMethod.setAccessible(true);
+    return new LeaseManager() {
+
+      @Override
+      public void begin(DFSClient client, long inodeId) {
+        try {
+          beginFileLeaseMethod.invoke(client, inodeId, null);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void end(DFSClient client, long inodeId) {
+        try {
+          endFileLeaseMethod.invoke(client, inodeId);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+          throws NoSuchMethodException {
+    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> ecnClass;
+    try {
+      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+              .asSubclass(Enum.class);
+    } catch (ClassNotFoundException e) {
+      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+    @SuppressWarnings("unchecked")
+    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+    Method combineHeaderMethod =
+            PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+    Method getStatusFromHeaderMethod =
+            PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          @SuppressWarnings("unchecked")
+          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+          Integer headerFlag;
+          if (flagList.isEmpty()) {
+            Status reply = (Status) getReplyMethod.invoke(ack, 0);
+            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+          } else {
+            headerFlag = flagList.get(0);
+          }
+          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+          throws NoSuchMethodException {
+    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          return (Status) getStatusMethod.invoke(ack, 0);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+          throws NoSuchMethodException {
+    try {
+      return createPipelineAckStatusGetter27();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("Can not get expected method " + e.getMessage() +
+              ", this usually because your Hadoop is pre 2.7.0, " +
+              "try the methods in Hadoop 2.6.x instead.");
+    }
+    return createPipelineAckStatusGetter26();
+  }
+
+  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+    Method setStorageTypeMethod =
+            OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+      builder.put(storageTypeProto.name(), storageTypeProto);
+    }
+    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+    return new StorageTypeSetter() {
+
+      @Override
+      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+        Object protoEnum = name2ProtoEnum.get(storageType.name());
+        try {
+          setStorageTypeMethod.invoke(builder, protoEnum);
+        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        return builder;
+      }
+    };
+  }
+
+  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+    for (Method method : ClientProtocol.class.getMethods()) {
+      if (method.getName().equals("addBlock")) {
+        Method addBlockMethod = method;
+        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+        if (paramTypes[paramTypes.length - 1] == String[].class) {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        } else {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes, null);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        }
+      }
+    }
+    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+  }
+
+  private static PBHelper createPBHelper() throws NoSuchMethodException {
+    Class<?> helperClass;
+    String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+    try {
+      helperClass = Class.forName(clazzName);
+    } catch (ClassNotFoundException e) {
+      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+              helperClass.toString() + " instead.");
+    }
+    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+    return new PBHelper() {
+
+      @Override
+      public ExtendedBlockProto convert(ExtendedBlock b) {
+        try {
+          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public TokenProto convert(Token<?> tok) {
+        try {
+          return (TokenProto) convertTokenMethod.invoke(null, tok);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    for (Method method : confClass.getMethods()) {
+      if (method.getName().equals("createChecksum")) {
+        Method createChecksumMethod = method;
+        return new ChecksumCreater() {
+
+          @Override
+          public DataChecksum createChecksum(DFSClient client) {
+            try {
+              return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+                      (Object) null);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      }
+    }
+    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+  }
+
+  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+    createChecksumMethod.setAccessible(true);
+    return new ChecksumCreater() {
+
+      @Override
+      public DataChecksum createChecksum(DFSClient client) {
+        try {
+          return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater()
+          throws NoSuchMethodException, ClassNotFoundException {
+    Method getConfMethod = DFSClient.class.getMethod("getConf");
+    try {
+      return createChecksumCreater28(getConfMethod,
+              Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+    }
+    return createChecksumCreater27(getConfMethod,
+            Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+  }
+
+  private static FileCreator createFileCreator3() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class, String.class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions, null);
+    };
+  }
+
+  private static FileCreator createFileCreator2() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions);
+    };
+  }
+
+  private static FileCreator createFileCreator() throws NoSuchMethodException {
+    try {
+      return createFileCreator3();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+    }
+    return createFileCreator2();
+  }
+
+  // cancel the processing if DFSClient is already closed.
+  static final class CancelOnClose implements CancelableProgressable {
+
+    private final DFSClient client;
+
+    public CancelOnClose(DFSClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public boolean progress() {
+      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+    }
+  }
+
+  static {
+    try {
+      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+      STORAGE_TYPE_SETTER = createStorageTypeSetter();
+      BLOCK_ADDER = createBlockAdder();
+      LEASE_MANAGER = createLeaseManager();
+      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+      PB_HELPER = createPBHelper();
+      CHECKSUM_CREATER = createChecksumCreater();
+      FILE_CREATOR = createFileCreator();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  static void beginFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.begin(client, inodeId);
+  }
+
+  static void endFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.end(client, inodeId);
+  }
+
+  static DataChecksum createChecksum(DFSClient client) {
+    return CHECKSUM_CREATER.createChecksum(client);
+  }
+
+  static Status getStatus(PipelineAckProto ack) {
+    return PIPELINE_ACK_STATUS_GETTER.get(ack);
+  }
+
+  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+                                                Promise<Channel> promise, int timeoutMs) {
+    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+            new ProtobufVarint32FrameDecoder(),
+            new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+            new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+              @Override
+              protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+                      throws Exception {
+                Status pipelineStatus = resp.getStatus();
+                if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+                  throw new IOException("datanode " + dnInfo + " is restarting");
+                }
+                String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+                if (resp.getStatus() != Status.SUCCESS) {
+                  if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+                    throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+                            resp.getMessage() + ", " + logInfo);
+                  } else {
+                    throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+                            ", status message " + resp.getMessage() + ", " + logInfo);
+                  }
+                }
+                // success
+                ChannelPipeline p = ctx.pipeline();
+                for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+                  // do not remove all handlers because we may have wrap or unwrap handlers at the header
+                  // of pipeline.
+                  if (handler instanceof IdleStateHandler) {
+                    break;
+                  }
+                }
+                // Disable auto read here. Enable it after we setup the streaming pipeline in
+                // FanOutOneBLockAsyncDFSOutput.
+                ctx.channel().config().setAutoRead(false);
+                promise.trySuccess(ctx.channel());
+              }
+
+              @Override
+              public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+                promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+              }
+
+              @Override
+              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+                if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+                  promise
+                          .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+                } else {
+                  super.userEventTriggered(ctx, evt);
+                }
+              }
+
+              @Override
+              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                promise.tryFailure(cause);
+              }
+            });
+  }
+
+  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+                                        OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+    int protoLen = proto.getSerializedSize();
+    ByteBuf buffer =
+            channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    buffer.writeByte(Op.WRITE_BLOCK.code);
+    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+    channel.writeAndFlush(buffer);
+  }
+
+  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                                 Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+                                 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+          throws IOException {
+    Promise<Void> saslPromise = channel.eventLoop().newPromise();
+    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+    saslPromise.addListener(new FutureListener<Void>() {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        if (future.isSuccess()) {
+          // setup response processing pipeline first, then send request.
+          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+        } else {
+          promise.tryFailure(future.cause());
+        }
+      }
+    });
+  }
+
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+                                                          String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+                                                          BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+                                                          Class<? extends Channel> channelClass) {
+    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+    boolean connectToDnViaHostname =
+            conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+    blockCopy.setNumBytes(locatedBlock.getBlockSize());
+    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+            .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+                    .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+            .setClientName(clientName).build();
+    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+            .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+            .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+            .setRequestedChecksum(checksumProto)
+            .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      DatanodeInfo dnInfo = datanodeInfos[i];
+      Enum<?> storageType = storageTypes[i];
+      Promise<Channel> promise = eventLoopGroup.next().newPromise();
+      futureList.add(promise);
+      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+      new Bootstrap().group(eventLoopGroup).channel(channelClass)
+              .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+        @Override
+        protected void initChannel(Channel ch) throws Exception {
+          // we need to get the remote address of the channel so we can only move on after
+          // channel connected. Leave an empty implementation here because netty does not allow
+          // a null handler.
+        }
+      }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+                    timeoutMs, client, locatedBlock.getBlockToken(), promise);
+          } else {
+            promise.tryFailure(future.cause());
+          }
+        }
+      });
+    }
+    return futureList;
+  }
+
+  /**
+   * Exception other than RemoteException thrown when calling create on namenode
+   */
+  public static class NameNodeException extends IOException {
+
+    private static final long serialVersionUID = 3143237406477095390L;
+
+    public NameNodeException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+                                                           boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                           EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    Configuration conf = dfs.getConf();
+    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+    ClientProtocol namenode = client.getNamenode();
+    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+            DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+    for (int retry = 0;; retry++) {
+      HdfsFileStatus stat;
+      try {
+        stat = FILE_CREATOR.create(namenode, src,
+                FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+                new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+                createParent, replication, blockSize, CryptoProtocolVersion.supported());
+      } catch (Exception e) {
+        if (e instanceof RemoteException) {
+          throw (RemoteException) e;
+        } else {
+          throw new NameNodeException(e);
+        }
+      }
+      beginFileLease(client, stat.getFileId());
+      boolean succ = false;
+      LocatedBlock locatedBlock = null;
+      List<Future<Channel>> futureList = null;
+      try {
+        DataChecksum summer = createChecksum(client);
+        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+                excludesNodes, stat.getFileId(), null);
+        List<Channel> datanodeList = new ArrayList<>();
+        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+                PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+        for (int i = 0, n = futureList.size(); i < n; i++) {
+          try {
+            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+          } catch (Exception e) {
+            // exclude the broken DN next time
+            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+            throw e;
+          }
+        }
+        Encryptor encryptor = createEncryptor(conf, stat, client);
+        FanOutOneBlockAsyncDFSOutput output =
+                new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+                        stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+        succ = true;
+        return output;
+      } catch (RemoteException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (shouldRetryCreate(e)) {
+          if (retry >= createMaxRetries) {
+            throw e.unwrapRemoteException();
+          }
+        } else {
+          throw e.unwrapRemoteException();
+        }
+      } catch (IOException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (retry >= createMaxRetries) {
+          throw e;
+        }
+        // overwrite the old broken file.
+        overwrite = true;
+        try {
+          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+        } catch (InterruptedException ie) {
+          throw new InterruptedIOException();
+        }
+      } finally {
+        if (!succ) {
+          if (futureList != null) {
+            for (Future<Channel> f : futureList) {
+              f.addListener(new FutureListener<Channel>() {
+
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                    future.getNow().close();
+                  }
+                }
+              });
+            }
+          }
+          endFileLease(client, stat.getFileId());
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+   * inside an {@link EventLoop}.
+   */
+  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+                                                          boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                          EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+              throws IOException, UnresolvedLinkException {
+        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+                blockSize, eventLoopGroup, channelClass);
+      }
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    }.resolve(dfs, f);
+  }
+
+  public static boolean shouldRetryCreate(RemoteException e) {
+    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+    // For exceptions other than this, we just throw it out. This is same with
+    // DFSOutputStream.newStreamForCreate.
+    return e.getClassName().endsWith("RetryStartFileException");
+  }
+
+  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+                           ExtendedBlock block, long fileId) {
+    for (int retry = 0;; retry++) {
+      try {
+        if (namenode.complete(src, clientName, block, fileId)) {
+          endFileLease(client, fileId);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " not finished, retry = " + retry);
+        }
+      } catch (RemoteException e) {
+        IOException ioe = e.unwrapRemoteException();
+        if (ioe instanceof LeaseExpiredException) {
+          LOG.warn("lease for file " + src + " is expired, give up", e);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+        }
+      } catch (Exception e) {
+        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+      }
+      sleepIgnoreInterrupt(retry);
+    }
+  }
+
+  static void sleepIgnoreInterrupt(int retry) {
+    try {
+      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+    } catch (InterruptedException e) {
+    }
+  }
+}
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..80ffb7e
--- /dev/null
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(
+              FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+  }
+
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  private static final String NAME_DELIMITER = " ";
+
+  private interface SaslAdaptor {
+
+    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+  }
+
+  private static final SaslAdaptor SASL_ADAPTOR;
+
+  private interface TransparentCryptoHelper {
+
+    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+            throws IOException;
+  }
+
+  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+  private static SaslAdaptor createSaslAdaptor()
+          throws NoSuchFieldException, NoSuchMethodException {
+    Field saslPropsResolverField =
+            SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+    saslPropsResolverField.setAccessible(true);
+    Field trustedChannelResolverField =
+            SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    Field fallbackToSimpleAuthField =
+            SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+    fallbackToSimpleAuthField.setAccessible(true);
+    return new SaslAdaptor() {
+
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+        try {
+          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+          throws NoSuchMethodException {
+    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+            .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey =
+                  (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+          throws ClassNotFoundException, NoSuchMethodException {
+    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+            "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+                  .invoke(null, feInfo, client.getKeyProvider());
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelper()
+          throws NoSuchMethodException, ClassNotFoundException {
+    try {
+      return createTransparentCryptoHelperWithoutHDFS12396();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+              " should be hadoop version with HDFS-12396", e);
+    }
+    return createTransparentCryptoHelperWithHDFS12396();
+  }
+
+  static {
+    try {
+      SASL_ADAPTOR = createSaslAdaptor();
+      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please "
+              + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+              + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     * @param userName SASL user name
+     * @param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+    private final Configuration conf;
+
+    private final Map<String, String> saslProps;
+
+    private final SaslClient saslClient;
+
+    private final int timeoutMs;
+
+    private final Promise<Void> promise;
+
+    private final DFSClient dfsClient;
+
+    private int step = 0;
+
+    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+                                Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+                                DFSClient dfsClient) throws SaslException {
+      this.conf = conf;
+      this.saslProps = saslProps;
+      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+              SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+      this.timeoutMs = timeoutMs;
+      this.promise = promise;
+      this.dfsClient = dfsClient;
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+      sendSaslMessage(ctx, payload, null);
+    }
+
+    private List<CipherOption> getCipherOptions() throws IOException {
+      // Negotiate cipher suites if configured. Currently, the only supported
+      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+      // values for future expansion.
+      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+      if (StringUtils.isBlank(cipherSuites)) {
+        return null;
+      }
+      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+        throw new IOException(String.format("Invalid cipher suite, %s=%s",
+                DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+      }
+      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+                                 List<CipherOption> options) throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+              DataTransferEncryptorMessageProto.newBuilder();
+      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+      if (payload != null) {
+        // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+        // and we want to keep that out of hbase-server.
+        builder.setPayload(ByteString.copyFrom(payload));
+      }
+      if (options != null) {
+        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+      }
+      DataTransferEncryptorMessageProto proto = builder.build();
+      int size = proto.getSerializedSize();
+      size += CodedOutputStream.computeRawVarint32Size(size);
+      ByteBuf buf = ctx.alloc().buffer(size);
+      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+      ctx.write(buf);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      sendSaslMessage(ctx, new byte[0]);
+      ctx.flush();
+      step++;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+        dfsClient.clearDataEncryptionKey();
+        throw new InvalidEncryptionKeyException(proto.getMessage());
+      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+        throw new IOException(proto.getMessage());
+      }
+    }
+
+    private String getNegotiatedQop() {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    }
+
+    private boolean isNegotiatedQopPrivacy() {
+      String qop = getNegotiatedQop();
+      return qop != null && "auth-conf".equalsIgnoreCase(qop);
+    }
+
+    private boolean requestedQopContainsPrivacy() {
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      return requestedQop.contains("auth-conf");
+    }
+
+    private void checkSaslComplete() throws IOException {
+      if (!saslClient.isComplete()) {
+        throw new IOException("Failed to complete SASL handshake");
+      }
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      String negotiatedQop = getNegotiatedQop();
+      LOG.debug(
+              "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+      if (!requestedQop.contains(negotiatedQop)) {
+        throw new IOException(String.format("SASL handshake completed, but "
+                        + "channel does not have acceptable quality of protection, "
+                        + "requested = %s, negotiated = %s",
+                requestedQop, negotiatedQop));
+      }
+    }
+
+    private boolean useWrap() {
+      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      return qop != null && !"auth".equalsIgnoreCase(qop);
+    }
+
+    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = saslClient.unwrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = saslClient.unwrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+              option.getOutIv());
+    }
+
+    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+                                         boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+      List<CipherOption> cipherOptions =
+              PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+      if (cipherOptions == null || cipherOptions.isEmpty()) {
+        return null;
+      }
+      CipherOption cipherOption = cipherOptions.get(0);
+      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+      if (msg instanceof DataTransferEncryptorMessageProto) {
+        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+        check(proto);
+        byte[] challenge = proto.getPayload().toByteArray();
+        byte[] response = saslClient.evaluateChallenge(challenge);
+        switch (step) {
+          case 1: {
+            List<CipherOption> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = getCipherOptions();
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
+          }
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            CipherOption cipherOption =
+                    getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+                      new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient),
+                        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                        new SaslUnwrapHandler(saslClient));
+              }
+            }
+            promise.trySuccess(null);
+            break;
+          }
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+        }
+      } else {
+        ctx.fireChannelRead(msg);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      promise.tryFailure(cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+      } else {
+        super.userEventTriggered(ctx, evt);
+      }
+    }
+  }
+
+  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final SaslClient saslClient;
+
+    public SaslUnwrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      msg.skipBytes(4);
+      byte[] b = new byte[msg.readableBytes()];
+      msg.readBytes(b);
+      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+    }
+  }
+
+  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+    private final SaslClient saslClient;
+
+    private CompositeByteBuf cBuf;
+
+    public SaslWrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+            throws Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf buf = (ByteBuf) msg;
+        cBuf.addComponent(buf);
+        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+      } else {
+        ctx.write(msg);
+      }
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx) throws Exception {
+      if (cBuf.isReadable()) {
+        byte[] b = new byte[cBuf.readableBytes()];
+        cBuf.readBytes(b);
+        cBuf.discardReadComponents();
+        byte[] wrapped = saslClient.wrap(b, 0, b.length);
+        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+        buf.writeInt(wrapped.length);
+        buf.writeBytes(wrapped);
+        ctx.write(buf);
+      }
+      ctx.flush();
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+      cBuf.release();
+      cBuf = null;
+    }
+  }
+
+  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final Decryptor decryptor;
+
+    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.decryptor = codec.createDecryptor();
+      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+      decryptor.decrypt(inBuffer, outBuffer);
+      outBuf.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+      ctx.fireChannelRead(outBuf);
+    }
+  }
+
+  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+    private final Encryptor encryptor;
+
+    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.encryptor = codec.createEncryptor();
+      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+            throws Exception {
+      if (preferDirect) {
+        return ctx.alloc().directBuffer(msg.readableBytes());
+      } else {
+        return ctx.alloc().buffer(msg.readableBytes());
+      }
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+      encryptor.encrypt(inBuffer, outBuffer);
+      out.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+    }
+  }
+
+  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+            + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+  }
+
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+  }
+
+  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+  }
+
+  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+            .toCharArray();
+  }
+
+  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+                                        String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+                                        DFSClient dfsClient) {
+    try {
+      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+              new ProtobufVarint32FrameDecoder(),
+              new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+              new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+                      dfsClient));
+    } catch (SaslException e) {
+      saslPromise.tryFailure(e);
+    }
+  }
+
+  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                               int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+                               Promise<Void> saslPromise) throws IOException {
+    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+    TrustedChannelResolver trustedChannelResolver =
+            SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+      saslPromise.trySuccess(null);
+      return;
+    }
+    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+    if (encryptionKey != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+              encryptionKeyToPassword(encryptionKey.encryptionKey),
+              createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+              client);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+                + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (dnInfo.getXferPort() < 1024) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (saslPropsResolver != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+              buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+              client);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+      // edge case.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+                + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    }
+  }
+
+  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+          throws IOException {
+    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+    if (feInfo == null) {
+      return null;
+    }
+    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+  }
+}
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
index 08e8abd..6724095 100644
--- a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
@@ -169,7 +169,9 @@ public class EnrichmentCoprocessorIntegrationTest extends BaseIntegrationTest {
 
   @AfterClass
   public static void teardown() throws Exception {
-    HBaseUtil.INSTANCE.teardown(testUtil);
+    if (null != testUtil) {
+      HBaseUtil.INSTANCE.teardown(testUtil);
+    }
     componentRunner.stop();
     resetLogging();
   }
diff --git a/metron-platform/metron-hbase/metron-hbase-common/pom.xml b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
index 548d976..233519a 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/pom.xml
+++ b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
@@ -101,7 +101,6 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-common</artifactId>
-            <classifier>tests</classifier>
             <version>${global_hadoop_version}</version>
             <scope>provided</scope>
             <exclusions>
@@ -129,7 +128,6 @@
         <dependency>
             <groupId>org.apache.hadoop</groupId>
             <artifactId>hadoop-hdfs</artifactId>
-            <classifier>tests</classifier>
             <version>${global_hadoop_version}</version>
             <scope>provided</scope>
             <exclusions>
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..a328de9
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputHelper() {
+  }
+
+  public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+  public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+  // use pooled allocator for performance.
+  private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+  // copied from DFSPacket since it is package private.
+  public static final long HEART_BEAT_SEQNO = -1L;
+
+  // Timeouts for communicating with DataNode for streaming writes/reads
+  public static final int READ_TIMEOUT = 60 * 1000;
+
+  private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+  // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+  // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+  // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+  // createPipelineAckStatusGetter for more details.
+  private interface PipelineAckStatusGetter {
+    Status get(PipelineAckProto ack);
+  }
+
+  private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+  // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+  // we need to use reflection to set it.See createStorageTypeSetter for more details.
+  private interface StorageTypeSetter {
+    OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+  }
+
+  private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+  // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+  // hadoop 2.8 or later. See createBlockAdder for more details.
+  private interface BlockAdder {
+
+    LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                          ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+            throws IOException;
+  }
+
+  private static final BlockAdder BLOCK_ADDER;
+
+  private interface LeaseManager {
+
+    void begin(DFSClient client, long inodeId);
+
+    void end(DFSClient client, long inodeId);
+  }
+
+  private static final LeaseManager LEASE_MANAGER;
+
+  // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+  // isClientRunning is not public so we need to use reflection.
+  private interface DFSClientAdaptor {
+
+    boolean isClientRunning(DFSClient client);
+  }
+
+  private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+  // helper class for convert protos.
+  private interface PBHelper {
+
+    ExtendedBlockProto convert(ExtendedBlock b);
+
+    TokenProto convert(Token<?> tok);
+  }
+
+  private static final PBHelper PB_HELPER;
+
+  // helper class for creating data checksum.
+  private interface ChecksumCreater {
+    DataChecksum createChecksum(DFSClient client);
+  }
+
+  private static final ChecksumCreater CHECKSUM_CREATER;
+
+  // helper class for creating files.
+  private interface FileCreator {
+    default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+                                  String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+                                  short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+            throws Exception {
+      try {
+        return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+                replication, blockSize, supportedVersions);
+      } catch (InvocationTargetException e) {
+        if (e.getCause() instanceof Exception) {
+          throw (Exception) e.getCause();
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
+      }
+    };
+
+    Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+                        EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+                        CryptoProtocolVersion[] supportedVersions) throws Exception;
+  }
+
+  private static final FileCreator FILE_CREATOR;
+
+  private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+    Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+    isClientRunningMethod.setAccessible(true);
+    return new DFSClientAdaptor() {
+
+      @Override
+      public boolean isClientRunning(DFSClient client) {
+        try {
+          return (Boolean) isClientRunningMethod.invoke(client);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+    Method beginFileLeaseMethod =
+            DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+    beginFileLeaseMethod.setAccessible(true);
+    Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+    endFileLeaseMethod.setAccessible(true);
+    return new LeaseManager() {
+
+      @Override
+      public void begin(DFSClient client, long inodeId) {
+        try {
+          beginFileLeaseMethod.invoke(client, inodeId, null);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public void end(DFSClient client, long inodeId) {
+        try {
+          endFileLeaseMethod.invoke(client, inodeId);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+          throws NoSuchMethodException {
+    Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+    @SuppressWarnings("rawtypes")
+    Class<? extends Enum> ecnClass;
+    try {
+      ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+              .asSubclass(Enum.class);
+    } catch (ClassNotFoundException e) {
+      String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+    @SuppressWarnings("unchecked")
+    Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+    Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+    Method combineHeaderMethod =
+            PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+    Method getStatusFromHeaderMethod =
+            PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          @SuppressWarnings("unchecked")
+          List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+          Integer headerFlag;
+          if (flagList.isEmpty()) {
+            Status reply = (Status) getReplyMethod.invoke(ack, 0);
+            headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+          } else {
+            headerFlag = flagList.get(0);
+          }
+          return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+          throws NoSuchMethodException {
+    Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+    return new PipelineAckStatusGetter() {
+
+      @Override
+      public Status get(PipelineAckProto ack) {
+        try {
+          return (Status) getStatusMethod.invoke(ack, 0);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+          throws NoSuchMethodException {
+    try {
+      return createPipelineAckStatusGetter27();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("Can not get expected method " + e.getMessage() +
+              ", this usually because your Hadoop is pre 2.7.0, " +
+              "try the methods in Hadoop 2.6.x instead.");
+    }
+    return createPipelineAckStatusGetter26();
+  }
+
+  private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+    Method setStorageTypeMethod =
+            OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+    ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+    for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+      builder.put(storageTypeProto.name(), storageTypeProto);
+    }
+    ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+    return new StorageTypeSetter() {
+
+      @Override
+      public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+        Object protoEnum = name2ProtoEnum.get(storageType.name());
+        try {
+          setStorageTypeMethod.invoke(builder, protoEnum);
+        } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+        return builder;
+      }
+    };
+  }
+
+  private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+    for (Method method : ClientProtocol.class.getMethods()) {
+      if (method.getName().equals("addBlock")) {
+        Method addBlockMethod = method;
+        Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+        if (paramTypes[paramTypes.length - 1] == String[].class) {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        } else {
+          return new BlockAdder() {
+
+            @Override
+            public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+                                         ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+                                         String[] favoredNodes) throws IOException {
+              try {
+                return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+                        excludeNodes, fileId, favoredNodes, null);
+              } catch (IllegalAccessException e) {
+                throw new RuntimeException(e);
+              } catch (InvocationTargetException e) {
+                Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+                throw new RuntimeException(e);
+              }
+            }
+          };
+        }
+      }
+    }
+    throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+  }
+
+  private static PBHelper createPBHelper() throws NoSuchMethodException {
+    Class<?> helperClass;
+    String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+    try {
+      helperClass = Class.forName(clazzName);
+    } catch (ClassNotFoundException e) {
+      helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+      LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+              helperClass.toString() + " instead.");
+    }
+    Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+    Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+    return new PBHelper() {
+
+      @Override
+      public ExtendedBlockProto convert(ExtendedBlock b) {
+        try {
+          return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public TokenProto convert(Token<?> tok) {
+        try {
+          return (TokenProto) convertTokenMethod.invoke(null, tok);
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    for (Method method : confClass.getMethods()) {
+      if (method.getName().equals("createChecksum")) {
+        Method createChecksumMethod = method;
+        return new ChecksumCreater() {
+
+          @Override
+          public DataChecksum createChecksum(DFSClient client) {
+            try {
+              return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+                      (Object) null);
+            } catch (IllegalAccessException | InvocationTargetException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        };
+      }
+    }
+    throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+  }
+
+  private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+          throws NoSuchMethodException {
+    Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+    createChecksumMethod.setAccessible(true);
+    return new ChecksumCreater() {
+
+      @Override
+      public DataChecksum createChecksum(DFSClient client) {
+        try {
+          return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+        } catch (IllegalAccessException | InvocationTargetException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static ChecksumCreater createChecksumCreater()
+          throws NoSuchMethodException, ClassNotFoundException {
+    Method getConfMethod = DFSClient.class.getMethod("getConf");
+    try {
+      return createChecksumCreater28(getConfMethod,
+              Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+    } catch (ClassNotFoundException e) {
+      LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+    }
+    return createChecksumCreater27(getConfMethod,
+            Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+  }
+
+  private static FileCreator createFileCreator3() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class, String.class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions, null);
+    };
+  }
+
+  private static FileCreator createFileCreator2() throws NoSuchMethodException {
+    Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+            String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+            CryptoProtocolVersion[].class);
+
+    return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+            supportedVersions) -> {
+      return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+              createParent, replication, blockSize, supportedVersions);
+    };
+  }
+
+  private static FileCreator createFileCreator() throws NoSuchMethodException {
+    try {
+      return createFileCreator3();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+    }
+    return createFileCreator2();
+  }
+
+  // cancel the processing if DFSClient is already closed.
+  static final class CancelOnClose implements CancelableProgressable {
+
+    private final DFSClient client;
+
+    public CancelOnClose(DFSClient client) {
+      this.client = client;
+    }
+
+    @Override
+    public boolean progress() {
+      return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+    }
+  }
+
+  static {
+    try {
+      PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+      STORAGE_TYPE_SETTER = createStorageTypeSetter();
+      BLOCK_ADDER = createBlockAdder();
+      LEASE_MANAGER = createLeaseManager();
+      DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+      PB_HELPER = createPBHelper();
+      CHECKSUM_CREATER = createChecksumCreater();
+      FILE_CREATOR = createFileCreator();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+              "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+              "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  static void beginFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.begin(client, inodeId);
+  }
+
+  static void endFileLease(DFSClient client, long inodeId) {
+    LEASE_MANAGER.end(client, inodeId);
+  }
+
+  static DataChecksum createChecksum(DFSClient client) {
+    return CHECKSUM_CREATER.createChecksum(client);
+  }
+
+  static Status getStatus(PipelineAckProto ack) {
+    return PIPELINE_ACK_STATUS_GETTER.get(ack);
+  }
+
+  private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+                                                Promise<Channel> promise, int timeoutMs) {
+    channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+            new ProtobufVarint32FrameDecoder(),
+            new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+            new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+              @Override
+              protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+                      throws Exception {
+                Status pipelineStatus = resp.getStatus();
+                if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+                  throw new IOException("datanode " + dnInfo + " is restarting");
+                }
+                String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+                if (resp.getStatus() != Status.SUCCESS) {
+                  if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+                    throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+                            resp.getMessage() + ", " + logInfo);
+                  } else {
+                    throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+                            ", status message " + resp.getMessage() + ", " + logInfo);
+                  }
+                }
+                // success
+                ChannelPipeline p = ctx.pipeline();
+                for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+                  // do not remove all handlers because we may have wrap or unwrap handlers at the header
+                  // of pipeline.
+                  if (handler instanceof IdleStateHandler) {
+                    break;
+                  }
+                }
+                // Disable auto read here. Enable it after we setup the streaming pipeline in
+                // FanOutOneBLockAsyncDFSOutput.
+                ctx.channel().config().setAutoRead(false);
+                promise.trySuccess(ctx.channel());
+              }
+
+              @Override
+              public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+                promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+              }
+
+              @Override
+              public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+                if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+                  promise
+                          .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+                } else {
+                  super.userEventTriggered(ctx, evt);
+                }
+              }
+
+              @Override
+              public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+                promise.tryFailure(cause);
+              }
+            });
+  }
+
+  private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+                                        OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+    OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+    int protoLen = proto.getSerializedSize();
+    ByteBuf buffer =
+            channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+    buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+    buffer.writeByte(Op.WRITE_BLOCK.code);
+    proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+    channel.writeAndFlush(buffer);
+  }
+
+  private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                                 Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+                                 DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+          throws IOException {
+    Promise<Void> saslPromise = channel.eventLoop().newPromise();
+    trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+    saslPromise.addListener(new FutureListener<Void>() {
+
+      @Override
+      public void operationComplete(Future<Void> future) throws Exception {
+        if (future.isSuccess()) {
+          // setup response processing pipeline first, then send request.
+          processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+          requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+        } else {
+          promise.tryFailure(future.cause());
+        }
+      }
+    });
+  }
+
+  private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+                                                          String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+                                                          BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+                                                          Class<? extends Channel> channelClass) {
+    Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+    DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+    boolean connectToDnViaHostname =
+            conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+    int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+    ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+    blockCopy.setNumBytes(locatedBlock.getBlockSize());
+    ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+            .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+                    .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+            .setClientName(clientName).build();
+    ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+    OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+            .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+            .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+            .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+            .setRequestedChecksum(checksumProto)
+            .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+    List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+    for (int i = 0; i < datanodeInfos.length; i++) {
+      DatanodeInfo dnInfo = datanodeInfos[i];
+      Enum<?> storageType = storageTypes[i];
+      Promise<Channel> promise = eventLoopGroup.next().newPromise();
+      futureList.add(promise);
+      String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+      new Bootstrap().group(eventLoopGroup).channel(channelClass)
+              .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+        @Override
+        protected void initChannel(Channel ch) throws Exception {
+          // we need to get the remote address of the channel so we can only move on after
+          // channel connected. Leave an empty implementation here because netty does not allow
+          // a null handler.
+        }
+      }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+        @Override
+        public void operationComplete(ChannelFuture future) throws Exception {
+          if (future.isSuccess()) {
+            initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+                    timeoutMs, client, locatedBlock.getBlockToken(), promise);
+          } else {
+            promise.tryFailure(future.cause());
+          }
+        }
+      });
+    }
+    return futureList;
+  }
+
+  /**
+   * Exception other than RemoteException thrown when calling create on namenode
+   */
+  public static class NameNodeException extends IOException {
+
+    private static final long serialVersionUID = 3143237406477095390L;
+
+    public NameNodeException(Throwable cause) {
+      super(cause);
+    }
+  }
+
+  private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+                                                           boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                           EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    Configuration conf = dfs.getConf();
+    FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+    DFSClient client = dfs.getClient();
+    String clientName = client.getClientName();
+    ClientProtocol namenode = client.getNamenode();
+    int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+            DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+    DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+    for (int retry = 0;; retry++) {
+      HdfsFileStatus stat;
+      try {
+        stat = FILE_CREATOR.create(namenode, src,
+                FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+                new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+                createParent, replication, blockSize, CryptoProtocolVersion.supported());
+      } catch (Exception e) {
+        if (e instanceof RemoteException) {
+          throw (RemoteException) e;
+        } else {
+          throw new NameNodeException(e);
+        }
+      }
+      beginFileLease(client, stat.getFileId());
+      boolean succ = false;
+      LocatedBlock locatedBlock = null;
+      List<Future<Channel>> futureList = null;
+      try {
+        DataChecksum summer = createChecksum(client);
+        locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+                excludesNodes, stat.getFileId(), null);
+        List<Channel> datanodeList = new ArrayList<>();
+        futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+                PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+        for (int i = 0, n = futureList.size(); i < n; i++) {
+          try {
+            datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+          } catch (Exception e) {
+            // exclude the broken DN next time
+            excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+            throw e;
+          }
+        }
+        Encryptor encryptor = createEncryptor(conf, stat, client);
+        FanOutOneBlockAsyncDFSOutput output =
+                new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+                        stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+        succ = true;
+        return output;
+      } catch (RemoteException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (shouldRetryCreate(e)) {
+          if (retry >= createMaxRetries) {
+            throw e.unwrapRemoteException();
+          }
+        } else {
+          throw e.unwrapRemoteException();
+        }
+      } catch (IOException e) {
+        LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+        if (retry >= createMaxRetries) {
+          throw e;
+        }
+        // overwrite the old broken file.
+        overwrite = true;
+        try {
+          Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+        } catch (InterruptedException ie) {
+          throw new InterruptedIOException();
+        }
+      } finally {
+        if (!succ) {
+          if (futureList != null) {
+            for (Future<Channel> f : futureList) {
+              f.addListener(new FutureListener<Channel>() {
+
+                @Override
+                public void operationComplete(Future<Channel> future) throws Exception {
+                  if (future.isSuccess()) {
+                    future.getNow().close();
+                  }
+                }
+              });
+            }
+          }
+          endFileLease(client, stat.getFileId());
+        }
+      }
+    }
+  }
+
+  /**
+   * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+   * inside an {@link EventLoop}.
+   */
+  public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+                                                          boolean overwrite, boolean createParent, short replication, long blockSize,
+                                                          EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+    return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+              throws IOException, UnresolvedLinkException {
+        return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+                blockSize, eventLoopGroup, channelClass);
+      }
+
+      @Override
+      public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+        throw new UnsupportedOperationException();
+      }
+    }.resolve(dfs, f);
+  }
+
+  public static boolean shouldRetryCreate(RemoteException e) {
+    // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+    // For exceptions other than this, we just throw it out. This is same with
+    // DFSOutputStream.newStreamForCreate.
+    return e.getClassName().endsWith("RetryStartFileException");
+  }
+
+  static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+                           ExtendedBlock block, long fileId) {
+    for (int retry = 0;; retry++) {
+      try {
+        if (namenode.complete(src, clientName, block, fileId)) {
+          endFileLease(client, fileId);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " not finished, retry = " + retry);
+        }
+      } catch (RemoteException e) {
+        IOException ioe = e.unwrapRemoteException();
+        if (ioe instanceof LeaseExpiredException) {
+          LOG.warn("lease for file " + src + " is expired, give up", e);
+          return;
+        } else {
+          LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+        }
+      } catch (Exception e) {
+        LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+      }
+      sleepIgnoreInterrupt(retry);
+    }
+  }
+
+  static void sleepIgnoreInterrupt(int retry) {
+    try {
+      Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+    } catch (InterruptedException e) {
+    }
+  }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..08acb13
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+  private static final Logger LOG =
+          LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+  private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+  }
+
+  private static final String SERVER_NAME = "0";
+  private static final String PROTOCOL = "hdfs";
+  private static final String MECHANISM = "DIGEST-MD5";
+  private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+  private static final String NAME_DELIMITER = " ";
+
+  private interface SaslAdaptor {
+
+    TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+    SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+    AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+  }
+
+  private static final SaslAdaptor SASL_ADAPTOR;
+
+  private interface TransparentCryptoHelper {
+
+    Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+            throws IOException;
+  }
+
+  private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+  private static SaslAdaptor createSaslAdaptor()
+          throws NoSuchFieldException, NoSuchMethodException {
+    Field saslPropsResolverField =
+            SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+    saslPropsResolverField.setAccessible(true);
+    Field trustedChannelResolverField =
+            SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+    trustedChannelResolverField.setAccessible(true);
+    Field fallbackToSimpleAuthField =
+            SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+    fallbackToSimpleAuthField.setAccessible(true);
+    return new SaslAdaptor() {
+
+      @Override
+      public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+        try {
+          return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+
+      @Override
+      public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+        try {
+          return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+          throws NoSuchMethodException {
+    Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+            .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey =
+                  (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+          throws ClassNotFoundException, NoSuchMethodException {
+    Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+    Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+            "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+    decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+    return new TransparentCryptoHelper() {
+
+      @Override
+      public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+                                       DFSClient client) throws IOException {
+        try {
+          KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+                  .invoke(null, feInfo, client.getKeyProvider());
+          CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+          Encryptor encryptor = cryptoCodec.createEncryptor();
+          encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+          return encryptor;
+        } catch (InvocationTargetException e) {
+          Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+          throw new RuntimeException(e.getTargetException());
+        } catch (GeneralSecurityException e) {
+          throw new IOException(e);
+        } catch (IllegalAccessException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    };
+  }
+
+  private static TransparentCryptoHelper createTransparentCryptoHelper()
+          throws NoSuchMethodException, ClassNotFoundException {
+    try {
+      return createTransparentCryptoHelperWithoutHDFS12396();
+    } catch (NoSuchMethodException e) {
+      LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+              " should be hadoop version with HDFS-12396", e);
+    }
+    return createTransparentCryptoHelperWithHDFS12396();
+  }
+
+  static {
+    try {
+      SASL_ADAPTOR = createSaslAdaptor();
+      TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+    } catch (Exception e) {
+      String msg = "Couldn't properly initialize access to HDFS internals. Please "
+              + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+              + "HBASE-16110 for more information.";
+      LOG.error(msg, e);
+      throw new Error(msg, e);
+    }
+  }
+
+  /**
+   * Sets user name and password when asked by the client-side SASL object.
+   */
+  private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+    private final char[] password;
+    private final String userName;
+
+    /**
+     * Creates a new SaslClientCallbackHandler.
+     * @param userName SASL user name
+     * @param password SASL password
+     */
+    public SaslClientCallbackHandler(String userName, char[] password) {
+      this.password = password;
+      this.userName = userName;
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+      NameCallback nc = null;
+      PasswordCallback pc = null;
+      RealmCallback rc = null;
+      for (Callback callback : callbacks) {
+        if (callback instanceof RealmChoiceCallback) {
+          continue;
+        } else if (callback instanceof NameCallback) {
+          nc = (NameCallback) callback;
+        } else if (callback instanceof PasswordCallback) {
+          pc = (PasswordCallback) callback;
+        } else if (callback instanceof RealmCallback) {
+          rc = (RealmCallback) callback;
+        } else {
+          throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+        }
+      }
+      if (nc != null) {
+        nc.setName(userName);
+      }
+      if (pc != null) {
+        pc.setPassword(password);
+      }
+      if (rc != null) {
+        rc.setText(rc.getDefaultText());
+      }
+    }
+  }
+
+  private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+    private final Configuration conf;
+
+    private final Map<String, String> saslProps;
+
+    private final SaslClient saslClient;
+
+    private final int timeoutMs;
+
+    private final Promise<Void> promise;
+
+    private final DFSClient dfsClient;
+
+    private int step = 0;
+
+    public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+                                Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+                                DFSClient dfsClient) throws SaslException {
+      this.conf = conf;
+      this.saslProps = saslProps;
+      this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+              SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+      this.timeoutMs = timeoutMs;
+      this.promise = promise;
+      this.dfsClient = dfsClient;
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+      sendSaslMessage(ctx, payload, null);
+    }
+
+    private List<CipherOption> getCipherOptions() throws IOException {
+      // Negotiate cipher suites if configured. Currently, the only supported
+      // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+      // values for future expansion.
+      String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+      if (StringUtils.isBlank(cipherSuites)) {
+        return null;
+      }
+      if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+        throw new IOException(String.format("Invalid cipher suite, %s=%s",
+                DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+      }
+      return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+    }
+
+    private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+                                 List<CipherOption> options) throws IOException {
+      DataTransferEncryptorMessageProto.Builder builder =
+              DataTransferEncryptorMessageProto.newBuilder();
+      builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+      if (payload != null) {
+        // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+        // and we want to keep that out of hbase-server.
+        builder.setPayload(ByteString.copyFrom(payload));
+      }
+      if (options != null) {
+        builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+      }
+      DataTransferEncryptorMessageProto proto = builder.build();
+      int size = proto.getSerializedSize();
+      size += CodedOutputStream.computeRawVarint32Size(size);
+      ByteBuf buf = ctx.alloc().buffer(size);
+      proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+      ctx.write(buf);
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+      sendSaslMessage(ctx, new byte[0]);
+      ctx.flush();
+      step++;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+      if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+        dfsClient.clearDataEncryptionKey();
+        throw new InvalidEncryptionKeyException(proto.getMessage());
+      } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+        throw new IOException(proto.getMessage());
+      }
+    }
+
+    private String getNegotiatedQop() {
+      return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+    }
+
+    private boolean isNegotiatedQopPrivacy() {
+      String qop = getNegotiatedQop();
+      return qop != null && "auth-conf".equalsIgnoreCase(qop);
+    }
+
+    private boolean requestedQopContainsPrivacy() {
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      return requestedQop.contains("auth-conf");
+    }
+
+    private void checkSaslComplete() throws IOException {
+      if (!saslClient.isComplete()) {
+        throw new IOException("Failed to complete SASL handshake");
+      }
+      Set<String> requestedQop =
+              ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+      String negotiatedQop = getNegotiatedQop();
+      LOG.debug(
+              "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+      if (!requestedQop.contains(negotiatedQop)) {
+        throw new IOException(String.format("SASL handshake completed, but "
+                        + "channel does not have acceptable quality of protection, "
+                        + "requested = %s, negotiated = %s",
+                requestedQop, negotiatedQop));
+      }
+    }
+
+    private boolean useWrap() {
+      String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+      return qop != null && !"auth".equalsIgnoreCase(qop);
+    }
+
+    private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+      byte[] inKey = option.getInKey();
+      if (inKey != null) {
+        inKey = saslClient.unwrap(inKey, 0, inKey.length);
+      }
+      byte[] outKey = option.getOutKey();
+      if (outKey != null) {
+        outKey = saslClient.unwrap(outKey, 0, outKey.length);
+      }
+      return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+              option.getOutIv());
+    }
+
+    private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+                                         boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+      List<CipherOption> cipherOptions =
+              PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+      if (cipherOptions == null || cipherOptions.isEmpty()) {
+        return null;
+      }
+      CipherOption cipherOption = cipherOptions.get(0);
+      return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+    }
+
+    @Override
+    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+      if (msg instanceof DataTransferEncryptorMessageProto) {
+        DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+        check(proto);
+        byte[] challenge = proto.getPayload().toByteArray();
+        byte[] response = saslClient.evaluateChallenge(challenge);
+        switch (step) {
+          case 1: {
+            List<CipherOption> cipherOptions = null;
+            if (requestedQopContainsPrivacy()) {
+              cipherOptions = getCipherOptions();
+            }
+            sendSaslMessage(ctx, response, cipherOptions);
+            ctx.flush();
+            step++;
+            break;
+          }
+          case 2: {
+            assert response == null;
+            checkSaslComplete();
+            CipherOption cipherOption =
+                    getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+            ChannelPipeline p = ctx.pipeline();
+            while (p.first() != null) {
+              p.removeFirst();
+            }
+            if (cipherOption != null) {
+              CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+              p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+                      new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+            } else {
+              if (useWrap()) {
+                p.addLast(new SaslWrapHandler(saslClient),
+                        new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+                        new SaslUnwrapHandler(saslClient));
+              }
+            }
+            promise.trySuccess(null);
+            break;
+          }
+          default:
+            throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+        }
+      } else {
+        ctx.fireChannelRead(msg);
+      }
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      promise.tryFailure(cause);
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+      if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+        promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+      } else {
+        super.userEventTriggered(ctx, evt);
+      }
+    }
+  }
+
+  private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final SaslClient saslClient;
+
+    public SaslUnwrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      saslClient.dispose();
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      msg.skipBytes(4);
+      byte[] b = new byte[msg.readableBytes()];
+      msg.readBytes(b);
+      ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+    }
+  }
+
+  private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+    private final SaslClient saslClient;
+
+    private CompositeByteBuf cBuf;
+
+    public SaslWrapHandler(SaslClient saslClient) {
+      this.saslClient = saslClient;
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+      cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+    }
+
+    @Override
+    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+            throws Exception {
+      if (msg instanceof ByteBuf) {
+        ByteBuf buf = (ByteBuf) msg;
+        cBuf.addComponent(buf);
+        cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+      } else {
+        ctx.write(msg);
+      }
+    }
+
+    @Override
+    public void flush(ChannelHandlerContext ctx) throws Exception {
+      if (cBuf.isReadable()) {
+        byte[] b = new byte[cBuf.readableBytes()];
+        cBuf.readBytes(b);
+        cBuf.discardReadComponents();
+        byte[] wrapped = saslClient.wrap(b, 0, b.length);
+        ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+        buf.writeInt(wrapped.length);
+        buf.writeBytes(wrapped);
+        ctx.write(buf);
+      }
+      ctx.flush();
+    }
+
+    @Override
+    public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+      cBuf.release();
+      cBuf = null;
+    }
+  }
+
+  private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+    private final Decryptor decryptor;
+
+    public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.decryptor = codec.createDecryptor();
+      this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+      ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+      decryptor.decrypt(inBuffer, outBuffer);
+      outBuf.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+      ctx.fireChannelRead(outBuf);
+    }
+  }
+
+  private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+    private final Encryptor encryptor;
+
+    public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+            throws GeneralSecurityException, IOException {
+      this.encryptor = codec.createEncryptor();
+      this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+    }
+
+    @Override
+    protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+            throws Exception {
+      if (preferDirect) {
+        return ctx.alloc().directBuffer(msg.readableBytes());
+      } else {
+        return ctx.alloc().buffer(msg.readableBytes());
+      }
+    }
+
+    @Override
+    protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+      ByteBuf inBuf;
+      boolean release = false;
+      if (msg.nioBufferCount() == 1) {
+        inBuf = msg;
+      } else {
+        inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+        msg.readBytes(inBuf);
+        release = true;
+      }
+      ByteBuffer inBuffer = inBuf.nioBuffer();
+      ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+      encryptor.encrypt(inBuffer, outBuffer);
+      out.writerIndex(inBuf.readableBytes());
+      if (release) {
+        inBuf.release();
+      }
+    }
+  }
+
+  private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+    return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+            + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+  }
+
+  private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+    return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+  }
+
+  private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+  }
+
+  private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+    return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+            .toCharArray();
+  }
+
+  private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+    Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+    saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+    saslProps.put(Sasl.SERVER_AUTH, "true");
+    saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+    return saslProps;
+  }
+
+  private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+                                        String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+                                        DFSClient dfsClient) {
+    try {
+      channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+              new ProtobufVarint32FrameDecoder(),
+              new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+              new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+                      dfsClient));
+    } catch (SaslException e) {
+      saslPromise.tryFailure(e);
+    }
+  }
+
+  static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+                               int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+                               Promise<Void> saslPromise) throws IOException {
+    SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+    SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+    TrustedChannelResolver trustedChannelResolver =
+            SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+    AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+    InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+    if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+      saslPromise.trySuccess(null);
+      return;
+    }
+    DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+    if (encryptionKey != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+              encryptionKeyToPassword(encryptionKey.encryptionKey),
+              createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+              client);
+    } else if (!UserGroupInformation.isSecurityEnabled()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+                + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (dnInfo.getXferPort() < 1024) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with "
+                + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    } else if (saslPropsResolver != null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+                "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+              buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+              client);
+    } else {
+      // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+      // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+      // edge case.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+                + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+      }
+      saslPromise.trySuccess(null);
+    }
+  }
+
+  static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+          throws IOException {
+    FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+    if (feInfo == null) {
+      return null;
+    }
+    return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+  }
+}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/pom.xml b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
index f118521..49d1605 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
@@ -60,11 +60,6 @@
             <scope>runtime</scope>
         </dependency>
         <dependency>
-            <groupId>org.apache.commons</groupId>
-            <artifactId>commons-math</artifactId>
-            <version>2.2</version>
-        </dependency>
-        <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <version>${global_hbase_version}</version>
@@ -196,6 +191,12 @@
             <scope>test</scope>
             <type>test-jar</type>
         </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-math</artifactId>
+            <version>2.2</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
index 384cdaa..09147b0 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -212,7 +212,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
       close();
 
       if(fs instanceof LocalFileSystem) {
-        outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())));
+        outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())), new FileSystem.Statistics(fs.getScheme()));
         syncHandler = SyncHandlers.LOCAL.getHandler();
       }
       else {
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index 584b27c..1380e5a 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -148,7 +148,7 @@
             <groupId>commons-beanutils</groupId>
             <artifactId>commons-beanutils</artifactId>
             <optional>true</optional>
-            <version>1.8.3</version>
+            <version>1.9.3</version>
         </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
diff --git a/pom.xml b/pom.xml
index ec5dd4f..90de889 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
                 <global_storm_kafka_version>1.2.3</global_storm_kafka_version>
                 <global_zeppelin_version>0.8.0</global_zeppelin_version>
                 <global_solr_version>7.4.0</global_solr_version>
+                <base_hadoop_version>3.1.1</base_hadoop_version>
                 <global_hbase_version>2.0.2</global_hbase_version>
                 <global_hbase_guava_version>17.0</global_hbase_guava_version>
                 <!-- For Zookeeper/Curator version compatibility see