You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/30 16:31:03 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes
apache/metron#1523
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new b1ca4f3 METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes apache/metron#1523
b1ca4f3 is described below
commit b1ca4f31cee956d9a547af2c83aa2ead898ff9c0
Author: mmiklavc <mi...@gmail.com>
AuthorDate: Wed Oct 30 12:30:41 2019 -0400
METRON-2232 Upgrade to Hadoop 3.1.1 (mmiklavc via nickwallen) closes apache/metron#1523
---
dependencies_with_url.csv | 49 +-
dev-utilities/build-utils/list_dependencies.sh | 2 +-
metron-analytics/metron-maas-service/pom.xml | 16 +-
.../metron/maas/service/ApplicationMaster.java | 1 +
.../service/callback/ContainerRequestListener.java | 4 +
.../apache/metron/maas/submit/ModelSubmission.java | 38 +-
.../metron/maas/service/MaasIntegrationTest.java | 30 +-
metron-analytics/metron-profiler-spark/pom.xml | 27 +
.../metron/rest/service/impl/GrokServiceImpl.java | 4 +-
metron-platform/metron-common/pom.xml | 5 -
.../FanOutOneBlockAsyncDFSOutputHelper.java | 899 +++++++++++++++++++++
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 782 ++++++++++++++++++
metron-platform/metron-hbase-server/pom.xml | 28 +-
.../FanOutOneBlockAsyncDFSOutputHelper.java | 899 +++++++++++++++++++++
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 783 ++++++++++++++++++
.../EnrichmentCoprocessorIntegrationTest.java | 4 +-
.../metron-hbase/metron-hbase-common/pom.xml | 2 -
.../FanOutOneBlockAsyncDFSOutputHelper.java | 899 +++++++++++++++++++++
.../FanOutOneBlockAsyncDFSOutputSaslHelper.java | 783 ++++++++++++++++++
.../metron-indexing/metron-indexing-common/pom.xml | 11 +-
.../metron/spout/pcap/PartitionHDFSWriter.java | 2 +-
metron-stellar/stellar-common/pom.xml | 2 +-
pom.xml | 1 +
23 files changed, 5218 insertions(+), 53 deletions(-)
diff --git a/dependencies_with_url.csv b/dependencies_with_url.csv
index 623a021..59eb838 100644
--- a/dependencies_with_url.csv
+++ b/dependencies_with_url.csv
@@ -1,5 +1,6 @@
com.jakewharton.fliptables:fliptables:jar:1.0.2:compile,Apache v2,https://github.com/JakeWharton/flip-tables
org.jboss.aesh:aesh:jar:0.66.19:compile,Apache v2,https://github.com/aeshell/aesh
+org.objenesis:objenesis:jar:1.0:compile,Apache v2,http://objenesis.org/
org.objenesis:objenesis:jar:1.2:compile,Apache v2,http://objenesis.org/
org.objenesis:objenesis:jar:2.1:compile,Apache v2,http://objenesis.org/
org.ow2.asm:asm:jar:4.1:compile,BSD,http://asm.ow2.org/
@@ -26,6 +27,7 @@ com.google.protobuf:protobuf-java:jar:2.5.0:compile,New BSD license,http://code.
com.google.protobuf:protobuf-java:jar:2.6.1:compile,New BSD license,http://code.google.com/p/protobuf
com.google.protobuf:protobuf-java:jar:3.1.0:compile,New BSD license,http://code.google.com/p/protobuf
com.jcraft:jsch:jar:0.1.42:compile,BSD,http://www.jcraft.com/jsch/
+com.jcraft:jsch:jar:0.1.54:compile,BSD,http://www.jcraft.com/jsch/
com.jayway.jsonpath:json-path:jar:2.3.0:compile,Apache v2,https://github.com/json-path/JsonPath
com.jayway.jsonpath:json-path:jar:2.4.0:compile,Apache v2,https://github.com/json-path/JsonPath
net.minidev:accessors-smart:jar:1.2:compile,Apache v2,https://github.com/netplex/json-smart-v2
@@ -48,6 +50,7 @@ javax.mail:mail:jar:1.4:compile,Common Development and Distribution License (CDD
javax.mail:mail:jar:1.4.1:compile,Common Development and Distribution License (CDDL) v1.0,https://glassfish.dev.java.net/javaee5/mail/
javax.servlet:javax.servlet-api:jar:3.1.0:compile,CDDL,http://servlet-spec.java.net
javax.ws.rs:javax.ws.rs-api:jar:2.0.1:compile,CDDL 1.1,https://github.com/jax-rs/api
+javax.ws.rs:jsr311-api:jar:1.1.1:compile,CDDL 1.1,https://github.com/javaee/jsr311
javax.xml.bind:jaxb-api:jar:2.2.11:compile,CDDL,http://jaxb.java.net/
javax.xml.bind:jaxb-api:jar:2.2.2:compile,CDDL,https://jaxb.dev.java.net/
javax.xml.bind:jaxb-api:jar:2.3.0:compile,CDDL,https://jaxb.dev.java.net/
@@ -72,8 +75,12 @@ org.clojure:clojure:jar:1.6.0:compile,Eclipse Public License 1.0,http://clojure.
org.clojure:clojure:jar:1.7.0:compile,Eclipse Public License 1.0,http://clojure.org/
org.codehaus.jackson:jackson-jaxrs:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
org.codehaus.jackson:jackson-jaxrs:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-jaxrs:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
org.codehaus.jackson:jackson-xc:jar:1.8.3:compile,Apache v2,http://jackson.codehaus.org
org.codehaus.jackson:jackson-xc:jar:1.9.13:compile,Apache v2,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-mapper-asl:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-xc:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
org.codehaus.janino:commons-compiler:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
org.codehaus.janino:janino:jar:3.0.8:compile,New BSD,https://github.com/janino-compiler/janino
org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
@@ -101,11 +108,17 @@ org.scala-lang:scalap:jar:2.11.0:compile,BSD-like,http://www.scala-lang.org/
oro:oro:jar:2.0.8:compile,ASLv2,http://attic.apache.org/projects/jakarta-oro.html
xmlenc:xmlenc:jar:0.52:compile,The BSD License,http://xmlenc.sourceforge.net
asm:asm:jar:3.1:compile,BSD,http://asm.ow2.org/
+com.sun.jersey.contribs:jersey-guice:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey.contribs:jersey-guice:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-client:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey:jersey-client:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-core:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey:jersey-core:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-json:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey:jersey-json:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-server:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.sun.jersey:jersey-server:jar:1.9:compile,CDDL 1.1,https://jersey.java.net/
+com.sun.jersey:jersey-servlet:jar:1.19:compile,CDDL 1.1,https://jersey.java.net/
com.thoughtworks.paranamer:paranamer:jar:2.3:compile,BSD,https://github.com/paul-hammant/paranamer
javax.servlet.jsp:jsp-api:jar:2.1:runtime,CDDL,http://oracle.com
javax.servlet.jsp:jsp-api:jar:2.1:compile,CDDL,http://oracle.com
@@ -161,6 +174,7 @@ com.codahale.metrics:metrics-core:jar:3.0.2:compile,MIT,https://github.com/codah
com.codahale.metrics:metrics-graphite:jar:3.0.2:compile,MIT,https://github.com/codahale/metrics
com.esotericsoftware.reflectasm:reflectasm:jar:shaded:1.07:compile,BSD,https://github.com/EsotericSoftware/reflectasm
com.fasterxml.jackson.core:jackson-annotations:jar:2.2.3:compile,ASLv2,http://wiki.fasterxml.com/JacksonHome
+com.fasterxml.jackson.core:jackson-annotations:jar:2.7.0:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-annotations:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-annotations:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-annotations:jar:2.9.0:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -169,6 +183,7 @@ com.fasterxml.jackson.core:jackson-core:jar:2.2.3:compile,ASLv2,http://wiki.fast
com.fasterxml.jackson.core:jackson-core:jar:2.6.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.6.6:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.7.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
+com.fasterxml.jackson.core:jackson-core:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.8.3:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.9.2:compile,ASLv2,https://github.com/FasterXML/jackson-core
com.fasterxml.jackson.core:jackson-core:jar:2.9.4:compile,ASLv2,https://github.com/FasterXML/jackson-core
@@ -177,6 +192,7 @@ com.fasterxml.jackson.core:jackson-databind:jar:2.2.3:compile,ASLv2,http://wiki.
com.fasterxml.jackson.core:jackson-databind:jar:2.4.3:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.6.7.1:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.7.4:compile,ASLv2,http://github.com/FasterXML/jackson
+com.fasterxml.jackson.core:jackson-databind:jar:2.7.8:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.8.3:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.2:compile,ASLv2,http://github.com/FasterXML/jackson
com.fasterxml.jackson.core:jackson-databind:jar:2.9.4:compile,ASLv2,http://github.com/FasterXML/jackson
@@ -196,10 +212,13 @@ com.fasterxml.jackson.datatype:jackson-datatype-jsr310:jar:2.9.5:compile,ASLv2,h
com.fasterxml.jackson.module:jackson-module-parameter-names:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
com.fasterxml.jackson.module:jackson-module-paranamer:jar:2.7.9:compile,ASLv2,https://github.com/FasterXML/jackson-modules-base
com.fasterxml.jackson.module:jackson-module-scala_2.11:jar:2.6.7.1:compile,ASLv2,https://github.com/FasterXML/jackson-module-scala
+com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
com.fasterxml.jackson.module:jackson-module-jaxb-annotations:jar:2.9.5:compile,ASLv2,https://github.com/FasterXML/jackson-modules-java8
com.fasterxml.woodstox:woodstox-core:jar:5.0.3:compile,ASLv2,https://github.com/FasterXML/woodstox
com.fasterxml:classmate:jar:1.3.1:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
com.fasterxml:classmate:jar:1.3.4:compile,ASLv2,http://github.com/cowtowncoder/java-classmate
+com.fasterxml.jackson.jaxrs:jackson-jaxrs-base:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-jaxrs-providers
+com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider:jar:2.7.8:compile,ASLv2,https://github.com/FasterXML/jackson-jaxrs-providers
com.google.code.gson:gson:jar:2.2.2:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
com.google.code.gson:gson:jar:2.2.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
com.google.code.gson:gson:jar:2.7:compile,The Apache Software License, Version 2.0,http://code.google.com/p/google-gson/
@@ -211,8 +230,11 @@ com.google.guava:guava:jar:14.0.1:compile,ASLv2,
com.google.guava:guava:jar:16.0.1:compile,ASLv2,
com.google.guava:guava:jar:17.0:compile,ASLv2,
com.google.guava:guava:jar:18.0:compile,ASLv2,
-com.google.inject.extensions:guice-servlet:jar:3.0:compile,ASLv2,
-com.google.inject:guice:jar:3.0:compile,ASLv2,
+com.google.inject.extensions:guice-servlet:jar:3.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject.extensions:guice-servlet:jar:4.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject:guice:jar:3.0:compile,ASLv2,https://github.com/google/guice
+com.google.inject:guice:jar:4.0:compile,ASLv2,https://github.com/google/guice
+com.google.re2j:re2j:jar:1.1:compile,BSD,https://github.com/google/re2j
com.googlecode.disruptor:disruptor:jar:2.10.4:compile,The Apache Software License, Version 2.0,http://code.google.com/p/disruptor/
com.lmax:disruptor:jar:3.3.0:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
com.lmax:disruptor:jar:3.3.2:compile,The Apache Software License, Version 2.0,https://github.com/LMAX-Exchange/disruptor/
@@ -234,6 +256,7 @@ commons-beanutils:commons-beanutils-core:jar:1.8.0:provided,ASLv2,http://commons
commons-beanutils:commons-beanutils:jar:1.7.0:compile,ASLv2,
commons-beanutils:commons-beanutils:jar:1.8.3:compile,ASLv2,http://commons.apache.org/beanutils/
commons-beanutils:commons-beanutils:jar:1.9.2:compile,ASLv2,http://commons.apache.org/beanutils/
+commons-beanutils:commons-beanutils:jar:1.9.3:compile,ASLv2,http://commons.apache.org/beanutils/
commons-cli:commons-cli:jar:1.2:compile,ASLv2,http://commons.apache.org/cli/
commons-cli:commons-cli:jar:1.3.1:compile,ASLv2,http://commons.apache.org/proper/commons-cli/
commons-codec:commons-codec:jar:1.10:compile,ASLv2,http://commons.apache.org/proper/commons-codec/
@@ -269,6 +292,7 @@ commons-logging:commons-logging:jar:1.2:compile,ASLv2,http://commons.apache.org/
commons-net:commons-net:jar:2.2:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:compile,ASLv2,http://commons.apache.org/net/
commons-net:commons-net:jar:3.1:provided,ASLv2,http://commons.apache.org/net/
+commons-net:commons-net:jar:3.6:compile,ASLv2,http://commons.apache.org/net/
commons-pool:commons-pool:jar:1.5.4:compile,ASLv2,http://commons.apache.org/proper/commons-pool/
commons-text:commons-text:jar:1.1:compile,ASLv2,http://commons.apache.org/proper/commons-text/
commons-validator:commons-validator:jar:1.4.0:compile,ASLv2,http://commons.apache.org/validator/
@@ -288,10 +312,11 @@ io.dropwizard.metrics:metrics-graphite:jar:3.1.0:compile,ASLv2,https://github.co
io.dropwizard.metrics:metrics-graphite:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-json:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
io.dropwizard.metrics:metrics-jvm:jar:3.1.5:compile,ASLv2,https://github.com/dropwizard/metrics
-io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,
-io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,
-io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,
-io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2,
+io.netty:netty-all:jar:4.0.23.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.0.23.Final:provided,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.0.52.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.1.17.Final:compile,ASLv2,https://netty.io/
+io.netty:netty-all:jar:4.1.23.Final:compile,ASLv2,https://netty.io/
io.netty:netty:jar:3.6.2.Final:compile,Apache License, Version 2.0,http://netty.io/
io.netty:netty:jar:3.7.0.Final:compile,Apache License, Version 2.0,http://netty.io/
io.netty:netty:jar:3.9.9.Final:compile,Apache License, Version 2.0,http://netty.io/
@@ -312,7 +337,7 @@ net.jpountz.lz4:lz4:jar:1.3.0:compile,The Apache Software License, Version 2.0,h
net.sf.py4j:py4j:jar:0.10.7:compile,,
nl.jqno.equalsverifier:equalsverifier:jar:2.0.2:compile,The Apache Software License, Version 2.0,http://www.jqno.nl/equalsverifier
org.codehaus.jackson:jackson-core-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
-org.codehaus.jackson:jackson-mapper-asl:jar:1.9.13:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
+org.codehaus.jackson:jackson-core-asl:jar:1.9.2:compile,The Apache Software License, Version 2.0,http://jackson.codehaus.org
org.codehaus.woodstox:stax2-api:jar:3.1.4:compile,The BSD License,http://wiki.fasterxml.com/WoodstoxStax2
org.codehaus.woodstox:woodstox-core-asl:jar:4.4.1:compile,ASLv2,http://woodstox.codehaus.org
org.eclipse.jetty:jetty-http:jar:9.3.0.M0:compile,ASLv2,http://www.eclipse.org/jetty
@@ -390,6 +415,7 @@ org.springframework.ldap:spring-ldap-core:jar:2.3.2.RELEASE:compile,ASLv2,https:
org.springframework.security:spring-security-ldap:jar:5.1.1.RELEASE:compile,ASLv2,https://spring.io/projects/spring-security
org.tukaani:xz:jar:1.0:compile,Public Domain,http://tukaani.org/xz/java.html
org.xerial.snappy:snappy-java:jar:1.0.4.1:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
+org.xerial.snappy:snappy-java:jar:1.0.5:compile,The Apache Software License, Version 2.0,http://code.google.com/p/snappy-java/
org.xerial.snappy:snappy-java:jar:1.1.1.7:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
org.xerial.snappy:snappy-java:jar:1.1.2.6:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
org.xerial.snappy:snappy-java:jar:1.1.7.1:compile,The Apache Software License, Version 2.0,https://github.com/xerial/snappy-java
@@ -465,7 +491,9 @@ de.jollyday:jollyday:jar:0.5.2:compile,ASLv2,http://jollyday.sourceforge.net/lic
org.threeten:threeten-extra:jar:1.0:compile,BSD,http://www.threeten.org/threeten-extra/license.html
org.atteo.classindex:classindex:jar:3.3:compile,ASLv2,https://github.com/atteo/classindex
com.squareup.okhttp:okhttp:jar:2.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okhttp:okhttp:jar:2.7.5:compile,ASLv2,https://github.com/square/okhttp
com.squareup.okio:okio:jar:1.4.0:compile,ASLv2,https://github.com/square/okhttp
+com.squareup.okio:okio:jar:1.6.0:compile,ASLv2,https://github.com/square/okhttp
org.htrace:htrace-core:jar:3.0.4:compile,ASLv2,http://htrace.incubator.apache.org/
net.byteseek:byteseek:jar:2.0.3:compile,BSD,https://github.com/nishihatapalmer/byteseek
org.springframework.security.kerberos:spring-security-kerberos-client:jar:1.0.1.RELEASE:compile,ASLv2,https://github.com/spring-projects/spring-security-kerberos
@@ -541,6 +569,7 @@ org.sonatype.sisu:sisu-guice:jar:no_aop:3.0.2:compile
org.sonatype.sisu:sisu-inject-bean:jar:2.2.2:compile
org.sonatype.sisu:sisu-inject-plexus:jar:2.2.2:compile
com.zaxxer:HikariCP:jar:2.7.8:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
+com.zaxxer:HikariCP-java7:jar:2.4.12:compile,ASLv2,https://github.com/brettwooldridge/HikariCP
org.hibernate.validator:hibernate-validator:jar:6.0.9.Final:compile,ASLv2,https://github.com/hibernate/hibernate-validator
com.github.palindromicity:simple-syslog:jar:0.0.3:compile,ASLv2,https://github.com/palindromicity/simple-syslog
org.elasticsearch.client:elasticsearch-rest-high-level-client:jar:5.6.14:compile,ASLv2,https://github.com/elastic/elasticsearch/blob/master/LICENSE.txt
@@ -550,6 +579,7 @@ com.vividsolutions:jts:jar:1.13:compile
joda-time:joda-time:jar:2.10:compile
org.elasticsearch:securesm:jar:1.2:compile
com.github.stephenc.jcip:jcip-annotations:jar:1.0-1:compile,ASLv2,http://stephenc.github.io/jcip-annotations/
+com.nimbusds:nimbus-jose-jwt:jar:4.41.1:compile,ASLv2,https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home
com.nimbusds:nimbus-jose-jwt:jar:4.41.2:compile,ASLv2,https://bitbucket.org/connect2id/nimbus-jose-jwt/wiki/Home
tomcat:jasper-compiler:jar:5.5.23:compile,ASLv2,https://tomcat.apache.org/
org.danilopianini:gson-extras:jar:0.2.1:compile,ASLv2,https://github.com/DanySK/gson-extras
@@ -576,3 +606,8 @@ org.glassfish.jersey.core:jersey-server:jar:2.25.1:compile,Eclipse Public Licens
org.glassfish.jersey.media:jersey-media-jaxb:jar:2.25.1:compile,Eclipse Public License 2.0,https://jersey.github.io/
org.glassfish.web:javax.servlet.jsp:jar:2.3.2:compile,Eclipse Public License 2.0,https://jersey.github.io/
org.glassfish:javax.el:jar:3.0.1-b11:compile,Eclipse Public License 2.0,https://jersey.github.io/
+com.cedarsoftware:java-util:jar:1.9.0:compile,ASLv2,https://github.com/jdereg/java-util
+com.cedarsoftware:json-io:jar:2.5.1:compile,ASLv2,https://github.com/jdereg/json-io
+de.ruedigermoeller:fst:jar:2.50:compile,ASLv2,https://github.com/RuedigerMoeller/fast-serialization
+dnsjava:dnsjava:jar:2.1.7:compile,BSD 2-clause,https://github.com/dnsjava/dnsjava
+org.ehcache:ehcache:jar:3.3.1:compile,ASLv2,https://www.ehcache.org/
diff --git a/dev-utilities/build-utils/list_dependencies.sh b/dev-utilities/build-utils/list_dependencies.sh
index 4e81b7b..04f3fdd 100755
--- a/dev-utilities/build-utils/list_dependencies.sh
+++ b/dev-utilities/build-utils/list_dependencies.sh
@@ -16,4 +16,4 @@
# limitations under the License.
#
-{ mvn dependency:list || { echo "ERROR: Failed to run mvn dependency:list" ; exit 1 ; } ; mvn dependency:list -PHDP-2.5.0.0 || { echo "ERROR: Failed to run mvn dependency:list -PHDP-2.5.0.0" ; exit 1 ; } ; } | grep "^\[INFO\] " | awk '{print $2}' | grep -v "org.apache" | grep -v "test" | grep -v "provided" | grep -v "runtime" | grep -v ":system" | sort | uniq
+{ mvn dependency:list || { echo "ERROR: Failed to run mvn dependency:list" ; exit 1 ; } ; mvn dependency:list -PHDP-3.1 || { echo "ERROR: Failed to run mvn dependency:list -PHDP-3.1" ; exit 1 ; } ; } | grep "^\[INFO\] " | awk '{print $2}' | grep -v "org.apache" | grep -v "test" | grep -v "provided" | grep -v "runtime" | grep -v ":system" | sort | uniq
diff --git a/metron-analytics/metron-maas-service/pom.xml b/metron-analytics/metron-maas-service/pom.xml
index 4e81be6..16560ef 100644
--- a/metron-analytics/metron-maas-service/pom.xml
+++ b/metron-analytics/metron-maas-service/pom.xml
@@ -28,7 +28,7 @@
<slf4j.version>1.7.7</slf4j.version>
<storm.hdfs.version>0.1.2</storm.hdfs.version>
<guava.version>${global_guava_version}</guava.version>
- <hadoop.version>2.7.1</hadoop.version>
+ <hadoop.version>${global_hadoop_version}</hadoop.version>
<jackson.version>1.9.13</jackson.version>
</properties>
<dependencies>
@@ -87,12 +87,12 @@
<dependency>
<groupId>com.sun.jersey.contribs</groupId>
<artifactId>jersey-guice</artifactId>
- <version>1.9</version>
+ <version>1.19</version>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
- <version>1.9</version>
+ <version>1.19</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -308,6 +308,16 @@
</excludes>
</filter>
</filters>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+ <shadedPattern>org.apache.metron.guava.${guava.version}</shadedPattern>
+ </relocation>
+ <relocation>
+ <pattern>com.google.thirdparty</pattern>
+ <shadedPattern>org.apache.metron.guava.thirdparty.${guava.version}</shadedPattern>
+ </relocation>
+ </relocations>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.DontIncludeResourceTransformer">
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
index dbab00f..9a27fc0 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/ApplicationMaster.java
@@ -533,6 +533,7 @@ public class ApplicationMaster {
}
}
else if(request.getAction() == Action.REMOVE) {
+ LOG.info("Removing containers");
listener.removeContainers(request.getNumInstances(), request);
}
}
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
index 0a957e3..daae495 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/service/callback/ContainerRequestListener.java
@@ -66,6 +66,7 @@ public class ContainerRequestListener implements AMRMClientAsync.CallbackHandler
, ServiceDiscoverer serviceDiscoverer
)
{
+ LOG.info("Initializing container request listener");
this.nmClient = nmClient;
this.amRMClient = amRMClient;
this.serviceDiscoverer = serviceDiscoverer;
@@ -74,11 +75,14 @@ public class ContainerRequestListener implements AMRMClientAsync.CallbackHandler
public void removeContainers(int number, ModelRequest request) {
+ LOG.info("Making request to remove container");
int i = 0;
for(Container c : state.getList(request)) {
if(i < number) {
+ LOG.info(String.format("Making request to removing container id: %s, node: %s", c.getId(), c.getNodeId()));
amRMClient.releaseAssignedContainer(c.getId());
nmClient.stopContainerAsync(c.getId(), c.getNodeId());
+ LOG.info(String.format("Done making request to removing container id: %s, node: %s", c.getId(), c.getNodeId()));
}
else {
break;
diff --git a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
index fcae40a..0aac885 100644
--- a/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
+++ b/metron-analytics/metron-maas-service/src/main/java/org/apache/metron/maas/submit/ModelSubmission.java
@@ -20,7 +20,22 @@ package org.apache.metron.maas.submit;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
-import org.apache.commons.cli.*;
+import java.io.BufferedInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.IOUtils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
@@ -30,24 +45,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.metron.maas.config.*;
+import org.apache.metron.maas.config.Action;
+import org.apache.metron.maas.config.MaaSConfig;
+import org.apache.metron.maas.config.Model;
+import org.apache.metron.maas.config.ModelEndpoint;
+import org.apache.metron.maas.config.ModelRequest;
import org.apache.metron.maas.discovery.ServiceDiscoverer;
+import org.apache.metron.maas.queue.Queue;
+import org.apache.metron.maas.queue.ZKQueue;
import org.apache.metron.maas.service.Constants;
import org.apache.metron.maas.service.Log4jPropertyHelper;
import org.apache.metron.maas.util.ConfigUtil;
-import org.apache.metron.maas.queue.Queue;
-import org.apache.metron.maas.queue.ZKQueue;
-
-import java.io.BufferedInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.AbstractMap;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-import java.util.function.Function;
public class ModelSubmission {
public enum ModelSubmissionOptions {
diff --git a/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java b/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
index 8752850..3288296 100644
--- a/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
+++ b/metron-analytics/metron-maas-service/src/test/java/org/apache/metron/maas/service/MaasIntegrationTest.java
@@ -16,18 +16,23 @@
* limitations under the License.
*/
package org.apache.metron.maas.service;
-import java.io.*;
+import com.google.common.base.Splitter;
+import com.google.common.collect.Iterables;
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.URL;
import java.nio.charset.StandardCharsets;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
-
-import com.google.common.base.Splitter;
-import com.google.common.collect.Iterables;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.RetryPolicy;
@@ -35,6 +40,7 @@ import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Shell;
@@ -45,10 +51,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.metron.integration.ComponentRunner;
import org.apache.metron.integration.components.YarnComponent;
import org.apache.metron.integration.components.ZKServerComponent;
-import org.apache.metron.maas.discovery.ServiceDiscoverer;
import org.apache.metron.maas.config.MaaSConfig;
import org.apache.metron.maas.config.Model;
import org.apache.metron.maas.config.ModelEndpoint;
+import org.apache.metron.maas.discovery.ServiceDiscoverer;
import org.apache.metron.maas.queue.ZKQueue;
import org.apache.metron.maas.submit.ModelSubmission;
import org.apache.metron.maas.util.ConfigUtil;
@@ -71,6 +77,7 @@ public class MaasIntegrationTest {
@BeforeClass
public static void setupBeforeClass() throws Exception {
UnitTestHelper.setJavaLoggingLevel(Level.SEVERE);
+ UnitTestHelper.setLog4jLevel(org.apache.log4j.Level.ERROR);
LOG.info("Starting up YARN cluster");
zkServerComponent = new ZKServerComponent();
@@ -180,10 +187,12 @@ public class MaasIntegrationTest {
List<ApplicationReport> apps = yarnClient.getApplications();
if (apps.size() == 0 ) {
Thread.sleep(10);
+ LOG.info("No YARN apps found yet, retrying.");
continue;
}
ApplicationReport appReport = apps.get(0);
if(appReport.getHost().equals("N/A")) {
+ LOG.info("YARN apps found but not ready yet, retrying.");
Thread.sleep(10);
continue;
}
@@ -193,8 +202,10 @@ public class MaasIntegrationTest {
+ appReport.getRpcPort() + "'.";
if (checkHostname(appReport.getHost()) && appReport.getRpcPort() == -1) {
verified = true;
+ LOG.info("Yarn app verified");
}
if (appReport.getYarnApplicationState() == YarnApplicationState.FINISHED) {
+ LOG.info("Yarn app state returned FINISHED");
break;
}
}
@@ -223,14 +234,17 @@ public class MaasIntegrationTest {
try {
List<ModelEndpoint> endpoints = discoverer.getEndpoints(new Model("dummy", "1.0"));
if (endpoints != null && endpoints.size() == 1) {
- LOG.trace("Found endpoints: " + endpoints.get(0));
+ LOG.info("Found endpoints: " + endpoints.get(0));
String output = makeRESTcall(new URL(endpoints.get(0).getEndpoint().getUrl() + "/echo/casey"));
if (output.contains("casey")) {
passed = true;
break;
}
+ } else {
+ LOG.info("Did not find endpoints, retrying");
}
} catch (Exception e) {
+ LOG.info("Rest call failed", e);
}
Thread.sleep(2000);
}
diff --git a/metron-analytics/metron-profiler-spark/pom.xml b/metron-analytics/metron-profiler-spark/pom.xml
index da8356e..8a896cf 100644
--- a/metron-analytics/metron-profiler-spark/pom.xml
+++ b/metron-analytics/metron-profiler-spark/pom.xml
@@ -29,6 +29,27 @@
</properties>
<dependencies>
<dependency>
+ <!-- prevents Spark from pulling in newer versions of jackson
+ (1.9.x) which conflict with metron's ${global_jackson_version} -->
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
+ <!-- prevents Spark from pulling in newer versions of jackson
+ (1.9.x) which conflict with metron's ${global_jackson_version} -->
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
+ <!-- prevents Spark from pulling in newer versions of jackson
+ (1.9.x) which conflict with metron's ${global_jackson_version} -->
+ <groupId>com.fasterxml.jackson.module</groupId>
+ <artifactId>jackson-module-scala_2.11</artifactId>
+ <version>${global_jackson_version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${global_spark_version}</version>
@@ -198,6 +219,12 @@
<pattern>com.tdunning</pattern>
<shadedPattern>org.apache.metron.tdunning</shadedPattern>
</relocation>
+ <!-- We have conflicts with Spark's version of Jackson. This avoids that issue by
+ by relocating the packages. -->
+ <relocation>
+ <pattern>com.fasterxml.jackson</pattern>
+ <shadedPattern>org.apache.metron.jackson.${global_jackson_version}</shadedPattern>
+ </relocation>
</relocations>
<artifactSet>
<excludes>
diff --git a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
index c43c661..a2ac4c6 100644
--- a/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
+++ b/metron-interface/metron-rest/src/main/java/org/apache/metron/rest/service/impl/GrokServiceImpl.java
@@ -22,7 +22,7 @@ import java.nio.charset.StandardCharsets;
import oi.thekraken.grok.api.Grok;
import oi.thekraken.grok.api.Match;
import org.apache.commons.io.IOUtils;
-import org.apache.directory.api.util.Strings;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
import org.apache.metron.rest.RestException;
import org.apache.metron.rest.model.GrokValidation;
@@ -68,7 +68,7 @@ public class GrokServiceImpl implements GrokService {
if (grokValidation.getPatternLabel() == null) {
throw new RestException("Pattern label is required");
}
- if (Strings.isEmpty(grokValidation.getStatement())) {
+ if (StringUtils.isEmpty(grokValidation.getStatement())) {
throw new RestException("Grok statement is required");
}
Grok grok = new Grok();
diff --git a/metron-platform/metron-common/pom.xml b/metron-platform/metron-common/pom.xml
index 5d6ddaa..b4114cc 100644
--- a/metron-platform/metron-common/pom.xml
+++ b/metron-platform/metron-common/pom.xml
@@ -348,11 +348,6 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>commons-beanutils</groupId>
- <artifactId>commons-beanutils</artifactId>
- <version>1.8.3</version>
- </dependency>
- <dependency>
<groupId>org.reflections</groupId>
<artifactId>reflections</artifactId>
<version>0.9.10</version>
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..1a4a218
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import com.google.protobuf.CodedOutputStream;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputHelper() {
+ }
+
+ public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+ public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+ // use pooled allocator for performance.
+ private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+ // copied from DFSPacket since it is package private.
+ public static final long HEART_BEAT_SEQNO = -1L;
+
+ // Timeouts for communicating with DataNode for streaming writes/reads
+ public static final int READ_TIMEOUT = 60 * 1000;
+
+ private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+ // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+ // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+ // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+ // createPipelineAckStatusGetter for more details.
+ private interface PipelineAckStatusGetter {
+ Status get(PipelineAckProto ack);
+ }
+
+ private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+ // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+ // we need to use reflection to set it.See createStorageTypeSetter for more details.
+ private interface StorageTypeSetter {
+ OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+ }
+
+ private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+ // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+ // hadoop 2.8 or later. See createBlockAdder for more details.
+ private interface BlockAdder {
+
+ LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+ throws IOException;
+ }
+
+ private static final BlockAdder BLOCK_ADDER;
+
+ private interface LeaseManager {
+
+ void begin(DFSClient client, long inodeId);
+
+ void end(DFSClient client, long inodeId);
+ }
+
+ private static final LeaseManager LEASE_MANAGER;
+
+ // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+ // isClientRunning is not public so we need to use reflection.
+ private interface DFSClientAdaptor {
+
+ boolean isClientRunning(DFSClient client);
+ }
+
+ private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+ // helper class for convert protos.
+ private interface PBHelper {
+
+ ExtendedBlockProto convert(ExtendedBlock b);
+
+ TokenProto convert(Token<?> tok);
+ }
+
+ private static final PBHelper PB_HELPER;
+
+ // helper class for creating data checksum.
+ private interface ChecksumCreater {
+ DataChecksum createChecksum(DFSClient client);
+ }
+
+ private static final ChecksumCreater CHECKSUM_CREATER;
+
+ // helper class for creating files.
+ private interface FileCreator {
+ default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+ throws Exception {
+ try {
+ return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ };
+
+ Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+ EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions) throws Exception;
+ }
+
+ private static final FileCreator FILE_CREATOR;
+
+ private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+ Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+ isClientRunningMethod.setAccessible(true);
+ return new DFSClientAdaptor() {
+
+ @Override
+ public boolean isClientRunning(DFSClient client) {
+ try {
+ return (Boolean) isClientRunningMethod.invoke(client);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+ Method beginFileLeaseMethod =
+ DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+ beginFileLeaseMethod.setAccessible(true);
+ Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+ endFileLeaseMethod.setAccessible(true);
+ return new LeaseManager() {
+
+ @Override
+ public void begin(DFSClient client, long inodeId) {
+ try {
+ beginFileLeaseMethod.invoke(client, inodeId, null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void end(DFSClient client, long inodeId) {
+ try {
+ endFileLeaseMethod.invoke(client, inodeId);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+ throws NoSuchMethodException {
+ Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+ @SuppressWarnings("rawtypes")
+ Class<? extends Enum> ecnClass;
+ try {
+ ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+ .asSubclass(Enum.class);
+ } catch (ClassNotFoundException e) {
+ String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ @SuppressWarnings("unchecked")
+ Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+ Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+ Method combineHeaderMethod =
+ PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+ Method getStatusFromHeaderMethod =
+ PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ @SuppressWarnings("unchecked")
+ List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+ Integer headerFlag;
+ if (flagList.isEmpty()) {
+ Status reply = (Status) getReplyMethod.invoke(ack, 0);
+ headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+ } else {
+ headerFlag = flagList.get(0);
+ }
+ return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+ throws NoSuchMethodException {
+ Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ return (Status) getStatusMethod.invoke(ack, 0);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+ throws NoSuchMethodException {
+ try {
+ return createPipelineAckStatusGetter27();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("Can not get expected method " + e.getMessage() +
+ ", this usually because your Hadoop is pre 2.7.0, " +
+ "try the methods in Hadoop 2.6.x instead.");
+ }
+ return createPipelineAckStatusGetter26();
+ }
+
+ private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+ Method setStorageTypeMethod =
+ OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+ ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+ for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+ builder.put(storageTypeProto.name(), storageTypeProto);
+ }
+ ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+ return new StorageTypeSetter() {
+
+ @Override
+ public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+ Object protoEnum = name2ProtoEnum.get(storageType.name());
+ try {
+ setStorageTypeMethod.invoke(builder, protoEnum);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ return builder;
+ }
+ };
+ }
+
+ private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+ for (Method method : ClientProtocol.class.getMethods()) {
+ if (method.getName().equals("addBlock")) {
+ Method addBlockMethod = method;
+ Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+ if (paramTypes[paramTypes.length - 1] == String[].class) {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ } else {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes, null);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ }
+ throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+ }
+
+ private static PBHelper createPBHelper() throws NoSuchMethodException {
+ Class<?> helperClass;
+ String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+ try {
+ helperClass = Class.forName(clazzName);
+ } catch (ClassNotFoundException e) {
+ helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+ helperClass.toString() + " instead.");
+ }
+ Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+ Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+ return new PBHelper() {
+
+ @Override
+ public ExtendedBlockProto convert(ExtendedBlock b) {
+ try {
+ return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public TokenProto convert(Token<?> tok) {
+ try {
+ return (TokenProto) convertTokenMethod.invoke(null, tok);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ for (Method method : confClass.getMethods()) {
+ if (method.getName().equals("createChecksum")) {
+ Method createChecksumMethod = method;
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+ (Object) null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+ }
+
+ private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+ createChecksumMethod.setAccessible(true);
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater()
+ throws NoSuchMethodException, ClassNotFoundException {
+ Method getConfMethod = DFSClient.class.getMethod("getConf");
+ try {
+ return createChecksumCreater28(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+ }
+ return createChecksumCreater27(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+ }
+
+ private static FileCreator createFileCreator3() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class, String.class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions, null);
+ };
+ }
+
+ private static FileCreator createFileCreator2() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions);
+ };
+ }
+
+ private static FileCreator createFileCreator() throws NoSuchMethodException {
+ try {
+ return createFileCreator3();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+ }
+ return createFileCreator2();
+ }
+
+ // cancel the processing if DFSClient is already closed.
+ static final class CancelOnClose implements CancelableProgressable {
+
+ private final DFSClient client;
+
+ public CancelOnClose(DFSClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean progress() {
+ return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+ }
+ }
+
+ static {
+ try {
+ PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+ STORAGE_TYPE_SETTER = createStorageTypeSetter();
+ BLOCK_ADDER = createBlockAdder();
+ LEASE_MANAGER = createLeaseManager();
+ DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+ PB_HELPER = createPBHelper();
+ CHECKSUM_CREATER = createChecksumCreater();
+ FILE_CREATOR = createFileCreator();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ static void beginFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.begin(client, inodeId);
+ }
+
+ static void endFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.end(client, inodeId);
+ }
+
+ static DataChecksum createChecksum(DFSClient client) {
+ return CHECKSUM_CREATER.createChecksum(client);
+ }
+
+ static Status getStatus(PipelineAckProto ack) {
+ return PIPELINE_ACK_STATUS_GETTER.get(ack);
+ }
+
+ private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+ Promise<Channel> promise, int timeoutMs) {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+ new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+ throws Exception {
+ Status pipelineStatus = resp.getStatus();
+ if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+ throw new IOException("datanode " + dnInfo + " is restarting");
+ }
+ String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+ if (resp.getStatus() != Status.SUCCESS) {
+ if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+ resp.getMessage() + ", " + logInfo);
+ } else {
+ throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+ ", status message " + resp.getMessage() + ", " + logInfo);
+ }
+ }
+ // success
+ ChannelPipeline p = ctx.pipeline();
+ for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+ // do not remove all handlers because we may have wrap or unwrap handlers at the header
+ // of pipeline.
+ if (handler instanceof IdleStateHandler) {
+ break;
+ }
+ }
+ // Disable auto read here. Enable it after we setup the streaming pipeline in
+ // FanOutOneBLockAsyncDFSOutput.
+ ctx.channel().config().setAutoRead(false);
+ promise.trySuccess(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise
+ .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+ });
+ }
+
+ private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+ OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+ OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+ int protoLen = proto.getSerializedSize();
+ ByteBuf buffer =
+ channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ buffer.writeByte(Op.WRITE_BLOCK.code);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+ channel.writeAndFlush(buffer);
+ }
+
+ private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+ DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+ throws IOException {
+ Promise<Void> saslPromise = channel.eventLoop().newPromise();
+ trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+ saslPromise.addListener(new FutureListener<Void>() {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ if (future.isSuccess()) {
+ // setup response processing pipeline first, then send request.
+ processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+ requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+ String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+ BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) {
+ Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+ DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+ boolean connectToDnViaHostname =
+ conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+ ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+ blockCopy.setNumBytes(locatedBlock.getBlockSize());
+ ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+ .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+ .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+ .setClientName(clientName).build();
+ ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+ OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+ .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+ .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+ .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+ List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+ for (int i = 0; i < datanodeInfos.length; i++) {
+ DatanodeInfo dnInfo = datanodeInfos[i];
+ Enum<?> storageType = storageTypes[i];
+ Promise<Channel> promise = eventLoopGroup.next().newPromise();
+ futureList.add(promise);
+ String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+ new Bootstrap().group(eventLoopGroup).channel(channelClass)
+ .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ // we need to get the remote address of the channel so we can only move on after
+ // channel connected. Leave an empty implementation here because netty does not allow
+ // a null handler.
+ }
+ }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+ timeoutMs, client, locatedBlock.getBlockToken(), promise);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+ return futureList;
+ }
+
+ /**
+ * Exception other than RemoteException thrown when calling create on namenode
+ */
+ public static class NameNodeException extends IOException {
+
+ private static final long serialVersionUID = 3143237406477095390L;
+
+ public NameNodeException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ Configuration conf = dfs.getConf();
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ DFSClient client = dfs.getClient();
+ String clientName = client.getClientName();
+ ClientProtocol namenode = client.getNamenode();
+ int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+ DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+ DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+ for (int retry = 0;; retry++) {
+ HdfsFileStatus stat;
+ try {
+ stat = FILE_CREATOR.create(namenode, src,
+ FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+ new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+ createParent, replication, blockSize, CryptoProtocolVersion.supported());
+ } catch (Exception e) {
+ if (e instanceof RemoteException) {
+ throw (RemoteException) e;
+ } else {
+ throw new NameNodeException(e);
+ }
+ }
+ beginFileLease(client, stat.getFileId());
+ boolean succ = false;
+ LocatedBlock locatedBlock = null;
+ List<Future<Channel>> futureList = null;
+ try {
+ DataChecksum summer = createChecksum(client);
+ locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+ excludesNodes, stat.getFileId(), null);
+ List<Channel> datanodeList = new ArrayList<>();
+ futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+ PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+ for (int i = 0, n = futureList.size(); i < n; i++) {
+ try {
+ datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+ } catch (Exception e) {
+ // exclude the broken DN next time
+ excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+ throw e;
+ }
+ }
+ Encryptor encryptor = createEncryptor(conf, stat, client);
+ FanOutOneBlockAsyncDFSOutput output =
+ new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+ stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+ succ = true;
+ return output;
+ } catch (RemoteException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (shouldRetryCreate(e)) {
+ if (retry >= createMaxRetries) {
+ throw e.unwrapRemoteException();
+ }
+ } else {
+ throw e.unwrapRemoteException();
+ }
+ } catch (IOException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (retry >= createMaxRetries) {
+ throw e;
+ }
+ // overwrite the old broken file.
+ overwrite = true;
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException();
+ }
+ } finally {
+ if (!succ) {
+ if (futureList != null) {
+ for (Future<Channel> f : futureList) {
+ f.addListener(new FutureListener<Channel>() {
+
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ future.getNow().close();
+ }
+ }
+ });
+ }
+ }
+ endFileLease(client, stat.getFileId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+ * inside an {@link EventLoop}.
+ */
+ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+ throws IOException, UnresolvedLinkException {
+ return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+ blockSize, eventLoopGroup, channelClass);
+ }
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }.resolve(dfs, f);
+ }
+
+ public static boolean shouldRetryCreate(RemoteException e) {
+ // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+ // For exceptions other than this, we just throw it out. This is same with
+ // DFSOutputStream.newStreamForCreate.
+ return e.getClassName().endsWith("RetryStartFileException");
+ }
+
+ static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock block, long fileId) {
+ for (int retry = 0;; retry++) {
+ try {
+ if (namenode.complete(src, clientName, block, fileId)) {
+ endFileLease(client, fileId);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " not finished, retry = " + retry);
+ }
+ } catch (RemoteException e) {
+ IOException ioe = e.unwrapRemoteException();
+ if (ioe instanceof LeaseExpiredException) {
+ LOG.warn("lease for file " + src + " is expired, give up", e);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ } catch (Exception e) {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ sleepIgnoreInterrupt(retry);
+ }
+ }
+
+ static void sleepIgnoreInterrupt(int retry) {
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..b4664d4
--- /dev/null
+++ b/metron-platform/metron-data-management/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,782 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+ }
+
+ private static final String SERVER_NAME = "0";
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+ private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+ private static final String NAME_DELIMITER = " ";
+
+ private interface SaslAdaptor {
+
+ TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+ SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+ AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+ }
+
+ private static final SaslAdaptor SASL_ADAPTOR;
+
+ private interface TransparentCryptoHelper {
+
+ Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+ throws IOException;
+ }
+
+ private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+ private static SaslAdaptor createSaslAdaptor()
+ throws NoSuchFieldException, NoSuchMethodException {
+ Field saslPropsResolverField =
+ SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+ saslPropsResolverField.setAccessible(true);
+ Field trustedChannelResolverField =
+ SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+ trustedChannelResolverField.setAccessible(true);
+ Field fallbackToSimpleAuthField =
+ SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+ fallbackToSimpleAuthField.setAccessible(true);
+ return new SaslAdaptor() {
+
+ @Override
+ public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+ try {
+ return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+ throws NoSuchMethodException {
+ Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+ .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey =
+ (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+ throws ClassNotFoundException, NoSuchMethodException {
+ Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+ Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+ "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+ .invoke(null, feInfo, client.getKeyProvider());
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelper()
+ throws NoSuchMethodException, ClassNotFoundException {
+ try {
+ return createTransparentCryptoHelperWithoutHDFS12396();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+ " should be hadoop version with HDFS-12396", e);
+ }
+ return createTransparentCryptoHelperWithHDFS12396();
+ }
+
+ static {
+ try {
+ SASL_ADAPTOR = createSaslAdaptor();
+ TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ /**
+ * Sets user name and password when asked by the client-side SASL object.
+ */
+ private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+ private final char[] password;
+ private final String userName;
+
+ /**
+ * Creates a new SaslClientCallbackHandler.
+ * @param userName SASL user name
+ * @param password SASL password
+ */
+ public SaslClientCallbackHandler(String userName, char[] password) {
+ this.password = password;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(password);
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+ private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+ private final Configuration conf;
+
+ private final Map<String, String> saslProps;
+
+ private final SaslClient saslClient;
+
+ private final int timeoutMs;
+
+ private final Promise<Void> promise;
+
+ private final DFSClient dfsClient;
+
+ private int step = 0;
+
+ public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+ Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+ DFSClient dfsClient) throws SaslException {
+ this.conf = conf;
+ this.saslProps = saslProps;
+ this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+ SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+ this.timeoutMs = timeoutMs;
+ this.promise = promise;
+ this.dfsClient = dfsClient;
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+ sendSaslMessage(ctx, payload, null);
+ }
+
+ private List<CipherOption> getCipherOptions() throws IOException {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (StringUtils.isBlank(cipherSuites)) {
+ return null;
+ }
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+ List<CipherOption> options) throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+ // and we want to keep that out of hbase-server.
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (options != null) {
+ builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+ }
+ DataTransferEncryptorMessageProto proto = builder.build();
+ int size = proto.getSerializedSize();
+ size += CodedOutputStream.computeRawVarint32Size(size);
+ ByteBuf buf = ctx.alloc().buffer(size);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+ ctx.write(buf);
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+ sendSaslMessage(ctx, new byte[0]);
+ ctx.flush();
+ step++;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ dfsClient.clearDataEncryptionKey();
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ }
+ }
+
+ private String getNegotiatedQop() {
+ return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ }
+
+ private boolean isNegotiatedQopPrivacy() {
+ String qop = getNegotiatedQop();
+ return qop != null && "auth-conf".equalsIgnoreCase(qop);
+ }
+
+ private boolean requestedQopContainsPrivacy() {
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ return requestedQop.contains("auth-conf");
+ }
+
+ private void checkSaslComplete() throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ String negotiatedQop = getNegotiatedQop();
+ LOG.debug(
+ "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+ if (!requestedQop.contains(negotiatedQop)) {
+ throw new IOException(String.format("SASL handshake completed, but "
+ + "channel does not have acceptable quality of protection, "
+ + "requested = %s, negotiated = %s",
+ requestedQop, negotiatedQop));
+ }
+ }
+
+ private boolean useWrap() {
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
+ private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = saslClient.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = saslClient.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+ option.getOutIv());
+ }
+
+ private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+ boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+ List<CipherOption> cipherOptions =
+ PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+ if (cipherOptions == null || cipherOptions.isEmpty()) {
+ return null;
+ }
+ CipherOption cipherOption = cipherOptions.get(0);
+ return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof DataTransferEncryptorMessageProto) {
+ DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+ check(proto);
+ byte[] challenge = proto.getPayload().toByteArray();
+ byte[] response = saslClient.evaluateChallenge(challenge);
+ switch (step) {
+ case 1: {
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy()) {
+ cipherOptions = getCipherOptions();
+ }
+ sendSaslMessage(ctx, response, cipherOptions);
+ ctx.flush();
+ step++;
+ break;
+ }
+ case 2: {
+ assert response == null;
+ checkSaslComplete();
+ CipherOption cipherOption =
+ getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+ ChannelPipeline p = ctx.pipeline();
+ while (p.first() != null) {
+ p.removeFirst();
+ }
+ if (cipherOption != null) {
+ CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+ p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+ new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+ } else {
+ if (useWrap()) {
+ p.addLast(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+ new SaslUnwrapHandler(saslClient));
+ }
+ }
+ promise.trySuccess(null);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+ }
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+ }
+
+ private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final SaslClient saslClient;
+
+ public SaslUnwrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ msg.skipBytes(4);
+ byte[] b = new byte[msg.readableBytes()];
+ msg.readBytes(b);
+ ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+ }
+ }
+
+ private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+ private final SaslClient saslClient;
+
+ private CompositeByteBuf cBuf;
+
+ public SaslWrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+ throws Exception {
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ cBuf.addComponent(buf);
+ cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+ } else {
+ ctx.write(msg);
+ }
+ }
+
+ @Override
+ public void flush(ChannelHandlerContext ctx) throws Exception {
+ if (cBuf.isReadable()) {
+ byte[] b = new byte[cBuf.readableBytes()];
+ cBuf.readBytes(b);
+ cBuf.discardReadComponents();
+ byte[] wrapped = saslClient.wrap(b, 0, b.length);
+ ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+ buf.writeInt(wrapped.length);
+ buf.writeBytes(wrapped);
+ ctx.write(buf);
+ }
+ ctx.flush();
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ cBuf.release();
+ cBuf = null;
+ }
+ }
+
+ private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final Decryptor decryptor;
+
+ public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.decryptor = codec.createDecryptor();
+ this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+ ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+ decryptor.decrypt(inBuffer, outBuffer);
+ outBuf.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ ctx.fireChannelRead(outBuf);
+ }
+ }
+
+ private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+ private final Encryptor encryptor;
+
+ public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.encryptor = codec.createEncryptor();
+ this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+ throws Exception {
+ if (preferDirect) {
+ return ctx.alloc().directBuffer(msg.readableBytes());
+ } else {
+ return ctx.alloc().buffer(msg.readableBytes());
+ }
+ }
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+ encryptor.encrypt(inBuffer, outBuffer);
+ out.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ }
+ }
+
+ private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+ + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+ }
+
+ private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+ }
+
+ private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+ }
+
+ private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+ .toCharArray();
+ }
+
+ private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+ Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.SERVER_AUTH, "true");
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+ return saslProps;
+ }
+
+ private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+ String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+ DFSClient dfsClient) {
+ try {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+ new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+ dfsClient));
+ } catch (SaslException e) {
+ saslPromise.tryFailure(e);
+ }
+ }
+
+ static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+ Promise<Void> saslPromise) throws IOException {
+ SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+ SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+ TrustedChannelResolver trustedChannelResolver =
+ SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+ AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+ InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+ if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+ saslPromise.trySuccess(null);
+ return;
+ }
+ DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+ if (encryptionKey != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+ encryptionKeyToPassword(encryptionKey.encryptionKey),
+ createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+ client);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+ + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (dnInfo.getXferPort() < 1024) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (saslPropsResolver != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+ buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+ client);
+ } else {
+ // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+ // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+ // edge case.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+ + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ }
+ }
+
+ static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+ throws IOException {
+ FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+ if (feInfo == null) {
+ return null;
+ }
+ return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+ }
+}
diff --git a/metron-platform/metron-hbase-server/pom.xml b/metron-platform/metron-hbase-server/pom.xml
index 0f6bd83..120954e 100644
--- a/metron-platform/metron-hbase-server/pom.xml
+++ b/metron-platform/metron-hbase-server/pom.xml
@@ -34,6 +34,12 @@
<guava.version>${global_hbase_guava_version}</guava.version>
</properties>
+ <!--
+ Metron HBase server is meant to be deployed as a single uber jar. It's expected that
+ all runtime dependencies will be included in the jar or be provided by the environment
+ in which this jar will be run.
+ -->
+
<dependencies>
<!-- Metron -->
@@ -58,15 +64,33 @@
</exclusions>
</dependency>
<dependency>
+ <!-- We depend on this for some Zookeeper upload operations -->
+ <groupId>org.apache.metron</groupId>
+ <artifactId>stellar-common</artifactId>
+ <version>${project.parent.version}</version>
+ <classifier>uber</classifier>
+ </dependency>
+ <dependency>
<groupId>org.apache.metron</groupId>
<artifactId>metron-enrichment-common</artifactId>
<version>${project.parent.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
- <artifactId>stellar-common</artifactId>
+ <artifactId>metron-hbase-common</artifactId>
<version>${project.parent.version}</version>
- <classifier>uber</classifier>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<!-- HBase -->
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..e82a46c
--- /dev/null
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(
+ FanOutOneBlockAsyncDFSOutputHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputHelper() {
+ }
+
+ public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+ public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+ // use pooled allocator for performance.
+ private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+ // copied from DFSPacket since it is package private.
+ public static final long HEART_BEAT_SEQNO = -1L;
+
+ // Timeouts for communicating with DataNode for streaming writes/reads
+ public static final int READ_TIMEOUT = 60 * 1000;
+
+ private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+ // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+ // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+ // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+ // createPipelineAckStatusGetter for more details.
+ private interface PipelineAckStatusGetter {
+ Status get(PipelineAckProto ack);
+ }
+
+ private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+ // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+ // we need to use reflection to set it.See createStorageTypeSetter for more details.
+ private interface StorageTypeSetter {
+ OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+ }
+
+ private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+ // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+ // hadoop 2.8 or later. See createBlockAdder for more details.
+ private interface BlockAdder {
+
+ LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+ throws IOException;
+ }
+
+ private static final BlockAdder BLOCK_ADDER;
+
+ private interface LeaseManager {
+
+ void begin(DFSClient client, long inodeId);
+
+ void end(DFSClient client, long inodeId);
+ }
+
+ private static final LeaseManager LEASE_MANAGER;
+
+ // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+ // isClientRunning is not public so we need to use reflection.
+ private interface DFSClientAdaptor {
+
+ boolean isClientRunning(DFSClient client);
+ }
+
+ private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+ // helper class for convert protos.
+ private interface PBHelper {
+
+ ExtendedBlockProto convert(ExtendedBlock b);
+
+ TokenProto convert(Token<?> tok);
+ }
+
+ private static final PBHelper PB_HELPER;
+
+ // helper class for creating data checksum.
+ private interface ChecksumCreater {
+ DataChecksum createChecksum(DFSClient client);
+ }
+
+ private static final ChecksumCreater CHECKSUM_CREATER;
+
+ // helper class for creating files.
+ private interface FileCreator {
+ default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+ throws Exception {
+ try {
+ return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ };
+
+ Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+ EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions) throws Exception;
+ }
+
+ private static final FileCreator FILE_CREATOR;
+
+ private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+ Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+ isClientRunningMethod.setAccessible(true);
+ return new DFSClientAdaptor() {
+
+ @Override
+ public boolean isClientRunning(DFSClient client) {
+ try {
+ return (Boolean) isClientRunningMethod.invoke(client);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+ Method beginFileLeaseMethod =
+ DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+ beginFileLeaseMethod.setAccessible(true);
+ Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+ endFileLeaseMethod.setAccessible(true);
+ return new LeaseManager() {
+
+ @Override
+ public void begin(DFSClient client, long inodeId) {
+ try {
+ beginFileLeaseMethod.invoke(client, inodeId, null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void end(DFSClient client, long inodeId) {
+ try {
+ endFileLeaseMethod.invoke(client, inodeId);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+ throws NoSuchMethodException {
+ Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+ @SuppressWarnings("rawtypes")
+ Class<? extends Enum> ecnClass;
+ try {
+ ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+ .asSubclass(Enum.class);
+ } catch (ClassNotFoundException e) {
+ String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ @SuppressWarnings("unchecked")
+ Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+ Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+ Method combineHeaderMethod =
+ PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+ Method getStatusFromHeaderMethod =
+ PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ @SuppressWarnings("unchecked")
+ List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+ Integer headerFlag;
+ if (flagList.isEmpty()) {
+ Status reply = (Status) getReplyMethod.invoke(ack, 0);
+ headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+ } else {
+ headerFlag = flagList.get(0);
+ }
+ return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+ throws NoSuchMethodException {
+ Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ return (Status) getStatusMethod.invoke(ack, 0);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+ throws NoSuchMethodException {
+ try {
+ return createPipelineAckStatusGetter27();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("Can not get expected method " + e.getMessage() +
+ ", this usually because your Hadoop is pre 2.7.0, " +
+ "try the methods in Hadoop 2.6.x instead.");
+ }
+ return createPipelineAckStatusGetter26();
+ }
+
+ private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+ Method setStorageTypeMethod =
+ OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+ ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+ for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+ builder.put(storageTypeProto.name(), storageTypeProto);
+ }
+ ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+ return new StorageTypeSetter() {
+
+ @Override
+ public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+ Object protoEnum = name2ProtoEnum.get(storageType.name());
+ try {
+ setStorageTypeMethod.invoke(builder, protoEnum);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ return builder;
+ }
+ };
+ }
+
+ private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+ for (Method method : ClientProtocol.class.getMethods()) {
+ if (method.getName().equals("addBlock")) {
+ Method addBlockMethod = method;
+ Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+ if (paramTypes[paramTypes.length - 1] == String[].class) {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ } else {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes, null);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ }
+ throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+ }
+
+ private static PBHelper createPBHelper() throws NoSuchMethodException {
+ Class<?> helperClass;
+ String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+ try {
+ helperClass = Class.forName(clazzName);
+ } catch (ClassNotFoundException e) {
+ helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+ helperClass.toString() + " instead.");
+ }
+ Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+ Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+ return new PBHelper() {
+
+ @Override
+ public ExtendedBlockProto convert(ExtendedBlock b) {
+ try {
+ return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public TokenProto convert(Token<?> tok) {
+ try {
+ return (TokenProto) convertTokenMethod.invoke(null, tok);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ for (Method method : confClass.getMethods()) {
+ if (method.getName().equals("createChecksum")) {
+ Method createChecksumMethod = method;
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+ (Object) null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+ }
+
+ private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+ createChecksumMethod.setAccessible(true);
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater()
+ throws NoSuchMethodException, ClassNotFoundException {
+ Method getConfMethod = DFSClient.class.getMethod("getConf");
+ try {
+ return createChecksumCreater28(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+ }
+ return createChecksumCreater27(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+ }
+
+ private static FileCreator createFileCreator3() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class, String.class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions, null);
+ };
+ }
+
+ private static FileCreator createFileCreator2() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions);
+ };
+ }
+
+ private static FileCreator createFileCreator() throws NoSuchMethodException {
+ try {
+ return createFileCreator3();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+ }
+ return createFileCreator2();
+ }
+
+ // cancel the processing if DFSClient is already closed.
+ static final class CancelOnClose implements CancelableProgressable {
+
+ private final DFSClient client;
+
+ public CancelOnClose(DFSClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean progress() {
+ return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+ }
+ }
+
+ static {
+ try {
+ PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+ STORAGE_TYPE_SETTER = createStorageTypeSetter();
+ BLOCK_ADDER = createBlockAdder();
+ LEASE_MANAGER = createLeaseManager();
+ DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+ PB_HELPER = createPBHelper();
+ CHECKSUM_CREATER = createChecksumCreater();
+ FILE_CREATOR = createFileCreator();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ static void beginFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.begin(client, inodeId);
+ }
+
+ static void endFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.end(client, inodeId);
+ }
+
+ static DataChecksum createChecksum(DFSClient client) {
+ return CHECKSUM_CREATER.createChecksum(client);
+ }
+
+ static Status getStatus(PipelineAckProto ack) {
+ return PIPELINE_ACK_STATUS_GETTER.get(ack);
+ }
+
+ private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+ Promise<Channel> promise, int timeoutMs) {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+ new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+ throws Exception {
+ Status pipelineStatus = resp.getStatus();
+ if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+ throw new IOException("datanode " + dnInfo + " is restarting");
+ }
+ String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+ if (resp.getStatus() != Status.SUCCESS) {
+ if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+ resp.getMessage() + ", " + logInfo);
+ } else {
+ throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+ ", status message " + resp.getMessage() + ", " + logInfo);
+ }
+ }
+ // success
+ ChannelPipeline p = ctx.pipeline();
+ for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+ // do not remove all handlers because we may have wrap or unwrap handlers at the header
+ // of pipeline.
+ if (handler instanceof IdleStateHandler) {
+ break;
+ }
+ }
+ // Disable auto read here. Enable it after we setup the streaming pipeline in
+ // FanOutOneBLockAsyncDFSOutput.
+ ctx.channel().config().setAutoRead(false);
+ promise.trySuccess(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise
+ .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+ });
+ }
+
+ private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+ OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+ OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+ int protoLen = proto.getSerializedSize();
+ ByteBuf buffer =
+ channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ buffer.writeByte(Op.WRITE_BLOCK.code);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+ channel.writeAndFlush(buffer);
+ }
+
+ private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+ DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+ throws IOException {
+ Promise<Void> saslPromise = channel.eventLoop().newPromise();
+ trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+ saslPromise.addListener(new FutureListener<Void>() {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ if (future.isSuccess()) {
+ // setup response processing pipeline first, then send request.
+ processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+ requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+ String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+ BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) {
+ Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+ DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+ boolean connectToDnViaHostname =
+ conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+ ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+ blockCopy.setNumBytes(locatedBlock.getBlockSize());
+ ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+ .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+ .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+ .setClientName(clientName).build();
+ ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+ OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+ .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+ .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+ .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+ List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+ for (int i = 0; i < datanodeInfos.length; i++) {
+ DatanodeInfo dnInfo = datanodeInfos[i];
+ Enum<?> storageType = storageTypes[i];
+ Promise<Channel> promise = eventLoopGroup.next().newPromise();
+ futureList.add(promise);
+ String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+ new Bootstrap().group(eventLoopGroup).channel(channelClass)
+ .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ // we need to get the remote address of the channel so we can only move on after
+ // channel connected. Leave an empty implementation here because netty does not allow
+ // a null handler.
+ }
+ }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+ timeoutMs, client, locatedBlock.getBlockToken(), promise);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+ return futureList;
+ }
+
+ /**
+ * Exception other than RemoteException thrown when calling create on namenode
+ */
+ public static class NameNodeException extends IOException {
+
+ private static final long serialVersionUID = 3143237406477095390L;
+
+ public NameNodeException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ Configuration conf = dfs.getConf();
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ DFSClient client = dfs.getClient();
+ String clientName = client.getClientName();
+ ClientProtocol namenode = client.getNamenode();
+ int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+ DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+ DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+ for (int retry = 0;; retry++) {
+ HdfsFileStatus stat;
+ try {
+ stat = FILE_CREATOR.create(namenode, src,
+ FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+ new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+ createParent, replication, blockSize, CryptoProtocolVersion.supported());
+ } catch (Exception e) {
+ if (e instanceof RemoteException) {
+ throw (RemoteException) e;
+ } else {
+ throw new NameNodeException(e);
+ }
+ }
+ beginFileLease(client, stat.getFileId());
+ boolean succ = false;
+ LocatedBlock locatedBlock = null;
+ List<Future<Channel>> futureList = null;
+ try {
+ DataChecksum summer = createChecksum(client);
+ locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+ excludesNodes, stat.getFileId(), null);
+ List<Channel> datanodeList = new ArrayList<>();
+ futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+ PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+ for (int i = 0, n = futureList.size(); i < n; i++) {
+ try {
+ datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+ } catch (Exception e) {
+ // exclude the broken DN next time
+ excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+ throw e;
+ }
+ }
+ Encryptor encryptor = createEncryptor(conf, stat, client);
+ FanOutOneBlockAsyncDFSOutput output =
+ new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+ stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+ succ = true;
+ return output;
+ } catch (RemoteException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (shouldRetryCreate(e)) {
+ if (retry >= createMaxRetries) {
+ throw e.unwrapRemoteException();
+ }
+ } else {
+ throw e.unwrapRemoteException();
+ }
+ } catch (IOException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (retry >= createMaxRetries) {
+ throw e;
+ }
+ // overwrite the old broken file.
+ overwrite = true;
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException();
+ }
+ } finally {
+ if (!succ) {
+ if (futureList != null) {
+ for (Future<Channel> f : futureList) {
+ f.addListener(new FutureListener<Channel>() {
+
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ future.getNow().close();
+ }
+ }
+ });
+ }
+ }
+ endFileLease(client, stat.getFileId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+ * inside an {@link EventLoop}.
+ */
+ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+ throws IOException, UnresolvedLinkException {
+ return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+ blockSize, eventLoopGroup, channelClass);
+ }
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }.resolve(dfs, f);
+ }
+
+ public static boolean shouldRetryCreate(RemoteException e) {
+ // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+ // For exceptions other than this, we just throw it out. This is same with
+ // DFSOutputStream.newStreamForCreate.
+ return e.getClassName().endsWith("RetryStartFileException");
+ }
+
+ static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock block, long fileId) {
+ for (int retry = 0;; retry++) {
+ try {
+ if (namenode.complete(src, clientName, block, fileId)) {
+ endFileLease(client, fileId);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " not finished, retry = " + retry);
+ }
+ } catch (RemoteException e) {
+ IOException ioe = e.unwrapRemoteException();
+ if (ioe instanceof LeaseExpiredException) {
+ LOG.warn("lease for file " + src + " is expired, give up", e);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ } catch (Exception e) {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ sleepIgnoreInterrupt(retry);
+ }
+ }
+
+ static void sleepIgnoreInterrupt(int retry) {
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..80ffb7e
--- /dev/null
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(
+ FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+ }
+
+ private static final String SERVER_NAME = "0";
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+ private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+ private static final String NAME_DELIMITER = " ";
+
+ private interface SaslAdaptor {
+
+ TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+ SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+ AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+ }
+
+ private static final SaslAdaptor SASL_ADAPTOR;
+
+ private interface TransparentCryptoHelper {
+
+ Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+ throws IOException;
+ }
+
+ private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+ private static SaslAdaptor createSaslAdaptor()
+ throws NoSuchFieldException, NoSuchMethodException {
+ Field saslPropsResolverField =
+ SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+ saslPropsResolverField.setAccessible(true);
+ Field trustedChannelResolverField =
+ SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+ trustedChannelResolverField.setAccessible(true);
+ Field fallbackToSimpleAuthField =
+ SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+ fallbackToSimpleAuthField.setAccessible(true);
+ return new SaslAdaptor() {
+
+ @Override
+ public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+ try {
+ return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+ throws NoSuchMethodException {
+ Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+ .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey =
+ (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+ throws ClassNotFoundException, NoSuchMethodException {
+ Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+ Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+ "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+ .invoke(null, feInfo, client.getKeyProvider());
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelper()
+ throws NoSuchMethodException, ClassNotFoundException {
+ try {
+ return createTransparentCryptoHelperWithoutHDFS12396();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+ " should be hadoop version with HDFS-12396", e);
+ }
+ return createTransparentCryptoHelperWithHDFS12396();
+ }
+
+ static {
+ try {
+ SASL_ADAPTOR = createSaslAdaptor();
+ TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ /**
+ * Sets user name and password when asked by the client-side SASL object.
+ */
+ private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+ private final char[] password;
+ private final String userName;
+
+ /**
+ * Creates a new SaslClientCallbackHandler.
+ * @param userName SASL user name
+ * @param password SASL password
+ */
+ public SaslClientCallbackHandler(String userName, char[] password) {
+ this.password = password;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(password);
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+ private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+ private final Configuration conf;
+
+ private final Map<String, String> saslProps;
+
+ private final SaslClient saslClient;
+
+ private final int timeoutMs;
+
+ private final Promise<Void> promise;
+
+ private final DFSClient dfsClient;
+
+ private int step = 0;
+
+ public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+ Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+ DFSClient dfsClient) throws SaslException {
+ this.conf = conf;
+ this.saslProps = saslProps;
+ this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+ SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+ this.timeoutMs = timeoutMs;
+ this.promise = promise;
+ this.dfsClient = dfsClient;
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+ sendSaslMessage(ctx, payload, null);
+ }
+
+ private List<CipherOption> getCipherOptions() throws IOException {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (StringUtils.isBlank(cipherSuites)) {
+ return null;
+ }
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+ List<CipherOption> options) throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+ // and we want to keep that out of hbase-server.
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (options != null) {
+ builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+ }
+ DataTransferEncryptorMessageProto proto = builder.build();
+ int size = proto.getSerializedSize();
+ size += CodedOutputStream.computeRawVarint32Size(size);
+ ByteBuf buf = ctx.alloc().buffer(size);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+ ctx.write(buf);
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+ sendSaslMessage(ctx, new byte[0]);
+ ctx.flush();
+ step++;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ dfsClient.clearDataEncryptionKey();
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ }
+ }
+
+ private String getNegotiatedQop() {
+ return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ }
+
+ private boolean isNegotiatedQopPrivacy() {
+ String qop = getNegotiatedQop();
+ return qop != null && "auth-conf".equalsIgnoreCase(qop);
+ }
+
+ private boolean requestedQopContainsPrivacy() {
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ return requestedQop.contains("auth-conf");
+ }
+
+ private void checkSaslComplete() throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ String negotiatedQop = getNegotiatedQop();
+ LOG.debug(
+ "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+ if (!requestedQop.contains(negotiatedQop)) {
+ throw new IOException(String.format("SASL handshake completed, but "
+ + "channel does not have acceptable quality of protection, "
+ + "requested = %s, negotiated = %s",
+ requestedQop, negotiatedQop));
+ }
+ }
+
+ private boolean useWrap() {
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
+ private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = saslClient.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = saslClient.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+ option.getOutIv());
+ }
+
+ private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+ boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+ List<CipherOption> cipherOptions =
+ PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+ if (cipherOptions == null || cipherOptions.isEmpty()) {
+ return null;
+ }
+ CipherOption cipherOption = cipherOptions.get(0);
+ return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof DataTransferEncryptorMessageProto) {
+ DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+ check(proto);
+ byte[] challenge = proto.getPayload().toByteArray();
+ byte[] response = saslClient.evaluateChallenge(challenge);
+ switch (step) {
+ case 1: {
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy()) {
+ cipherOptions = getCipherOptions();
+ }
+ sendSaslMessage(ctx, response, cipherOptions);
+ ctx.flush();
+ step++;
+ break;
+ }
+ case 2: {
+ assert response == null;
+ checkSaslComplete();
+ CipherOption cipherOption =
+ getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+ ChannelPipeline p = ctx.pipeline();
+ while (p.first() != null) {
+ p.removeFirst();
+ }
+ if (cipherOption != null) {
+ CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+ p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+ new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+ } else {
+ if (useWrap()) {
+ p.addLast(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+ new SaslUnwrapHandler(saslClient));
+ }
+ }
+ promise.trySuccess(null);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+ }
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+ }
+
+ private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final SaslClient saslClient;
+
+ public SaslUnwrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ msg.skipBytes(4);
+ byte[] b = new byte[msg.readableBytes()];
+ msg.readBytes(b);
+ ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+ }
+ }
+
+ private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+ private final SaslClient saslClient;
+
+ private CompositeByteBuf cBuf;
+
+ public SaslWrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+ throws Exception {
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ cBuf.addComponent(buf);
+ cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+ } else {
+ ctx.write(msg);
+ }
+ }
+
+ @Override
+ public void flush(ChannelHandlerContext ctx) throws Exception {
+ if (cBuf.isReadable()) {
+ byte[] b = new byte[cBuf.readableBytes()];
+ cBuf.readBytes(b);
+ cBuf.discardReadComponents();
+ byte[] wrapped = saslClient.wrap(b, 0, b.length);
+ ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+ buf.writeInt(wrapped.length);
+ buf.writeBytes(wrapped);
+ ctx.write(buf);
+ }
+ ctx.flush();
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ cBuf.release();
+ cBuf = null;
+ }
+ }
+
+ private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final Decryptor decryptor;
+
+ public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.decryptor = codec.createDecryptor();
+ this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+ ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+ decryptor.decrypt(inBuffer, outBuffer);
+ outBuf.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ ctx.fireChannelRead(outBuf);
+ }
+ }
+
+ private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+ private final Encryptor encryptor;
+
+ public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.encryptor = codec.createEncryptor();
+ this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+ throws Exception {
+ if (preferDirect) {
+ return ctx.alloc().directBuffer(msg.readableBytes());
+ } else {
+ return ctx.alloc().buffer(msg.readableBytes());
+ }
+ }
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+ encryptor.encrypt(inBuffer, outBuffer);
+ out.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ }
+ }
+
+ private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+ + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+ }
+
+ private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+ }
+
+ private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+ }
+
+ private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+ .toCharArray();
+ }
+
+ private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+ Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.SERVER_AUTH, "true");
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+ return saslProps;
+ }
+
+ private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+ String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+ DFSClient dfsClient) {
+ try {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+ new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+ dfsClient));
+ } catch (SaslException e) {
+ saslPromise.tryFailure(e);
+ }
+ }
+
+ static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+ Promise<Void> saslPromise) throws IOException {
+ SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+ SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+ TrustedChannelResolver trustedChannelResolver =
+ SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+ AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+ InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+ if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+ saslPromise.trySuccess(null);
+ return;
+ }
+ DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+ if (encryptionKey != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+ encryptionKeyToPassword(encryptionKey.encryptionKey),
+ createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+ client);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+ + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (dnInfo.getXferPort() < 1024) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (saslPropsResolver != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+ buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+ client);
+ } else {
+ // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+ // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+ // edge case.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+ + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ }
+ }
+
+ static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+ throws IOException {
+ FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+ if (feInfo == null) {
+ return null;
+ }
+ return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+ }
+}
diff --git a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
index 08e8abd..6724095 100644
--- a/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
+++ b/metron-platform/metron-hbase-server/src/test/java/org/apache/metron/hbase/coprocessor/EnrichmentCoprocessorIntegrationTest.java
@@ -169,7 +169,9 @@ public class EnrichmentCoprocessorIntegrationTest extends BaseIntegrationTest {
@AfterClass
public static void teardown() throws Exception {
- HBaseUtil.INSTANCE.teardown(testUtil);
+ if (null != testUtil) {
+ HBaseUtil.INSTANCE.teardown(testUtil);
+ }
componentRunner.stop();
resetLogging();
}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/pom.xml b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
index 548d976..233519a 100644
--- a/metron-platform/metron-hbase/metron-hbase-common/pom.xml
+++ b/metron-platform/metron-hbase/metron-hbase-common/pom.xml
@@ -101,7 +101,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
- <classifier>tests</classifier>
<version>${global_hadoop_version}</version>
<scope>provided</scope>
<exclusions>
@@ -129,7 +128,6 @@
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
- <classifier>tests</classifier>
<version>${global_hadoop_version}</version>
<scope>provided</scope>
<exclusions>
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
new file mode 100644
index 0000000..a328de9
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputHelper.java
@@ -0,0 +1,899 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.fs.CreateFlag.CREATE;
+import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
+import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
+import static org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage.PIPELINE_SETUP_CREATE;
+import static org.apache.hbase.thirdparty.io.netty.channel.ChannelOption.CONNECT_TIMEOUT_MILLIS;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemLinkResolver;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.UnresolvedLinkException;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.datatransfer.Op;
+import org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.PipelineAckProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.ExtendedBlockProto;
+import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+import org.apache.hbase.thirdparty.io.netty.bootstrap.Bootstrap;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFuture;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelFutureListener;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelInitializer;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Future;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.FutureListener;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for implementing {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputHelper() {
+ }
+
+ public static final String ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = "hbase.fs.async.create.retries";
+
+ public static final int DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES = 10;
+ // use pooled allocator for performance.
+ private static final ByteBufAllocator ALLOC = PooledByteBufAllocator.DEFAULT;
+
+ // copied from DFSPacket since it is package private.
+ public static final long HEART_BEAT_SEQNO = -1L;
+
+ // Timeouts for communicating with DataNode for streaming writes/reads
+ public static final int READ_TIMEOUT = 60 * 1000;
+
+ private static final DatanodeInfo[] EMPTY_DN_ARRAY = new DatanodeInfo[0];
+
+ // helper class for getting Status from PipelineAckProto. In hadoop 2.6 or before, there is a
+ // getStatus method, and for hadoop 2.7 or after, the status is retrieved from flag. The flag may
+ // get from proto directly, or combined by the reply field of the proto and a ECN object. See
+ // createPipelineAckStatusGetter for more details.
+ private interface PipelineAckStatusGetter {
+ Status get(PipelineAckProto ack);
+ }
+
+ private static final PipelineAckStatusGetter PIPELINE_ACK_STATUS_GETTER;
+
+ // StorageType enum is placed under o.a.h.hdfs in hadoop 2.6 and o.a.h.fs in hadoop 2.7. So here
+ // we need to use reflection to set it.See createStorageTypeSetter for more details.
+ private interface StorageTypeSetter {
+ OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType);
+ }
+
+ private static final StorageTypeSetter STORAGE_TYPE_SETTER;
+
+ // helper class for calling add block method on namenode. There is a addBlockFlags parameter for
+ // hadoop 2.8 or later. See createBlockAdder for more details.
+ private interface BlockAdder {
+
+ LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId, String[] favoredNodes)
+ throws IOException;
+ }
+
+ private static final BlockAdder BLOCK_ADDER;
+
+ private interface LeaseManager {
+
+ void begin(DFSClient client, long inodeId);
+
+ void end(DFSClient client, long inodeId);
+ }
+
+ private static final LeaseManager LEASE_MANAGER;
+
+ // This is used to terminate a recoverFileLease call when FileSystem is already closed.
+ // isClientRunning is not public so we need to use reflection.
+ private interface DFSClientAdaptor {
+
+ boolean isClientRunning(DFSClient client);
+ }
+
+ private static final DFSClientAdaptor DFS_CLIENT_ADAPTOR;
+
+ // helper class for convert protos.
+ private interface PBHelper {
+
+ ExtendedBlockProto convert(ExtendedBlock b);
+
+ TokenProto convert(Token<?> tok);
+ }
+
+ private static final PBHelper PB_HELPER;
+
+ // helper class for creating data checksum.
+ private interface ChecksumCreater {
+ DataChecksum createChecksum(DFSClient client);
+ }
+
+ private static final ChecksumCreater CHECKSUM_CREATER;
+
+ // helper class for creating files.
+ private interface FileCreator {
+ default HdfsFileStatus create(ClientProtocol instance, String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag, boolean createParent,
+ short replication, long blockSize, CryptoProtocolVersion[] supportedVersions)
+ throws Exception {
+ try {
+ return (HdfsFileStatus) createObject(instance, src, masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions);
+ } catch (InvocationTargetException e) {
+ if (e.getCause() instanceof Exception) {
+ throw (Exception) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+ };
+
+ Object createObject(ClientProtocol instance, String src, FsPermission masked, String clientName,
+ EnumSetWritable<CreateFlag> flag, boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions) throws Exception;
+ }
+
+ private static final FileCreator FILE_CREATOR;
+
+ private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
+ Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
+ isClientRunningMethod.setAccessible(true);
+ return new DFSClientAdaptor() {
+
+ @Override
+ public boolean isClientRunning(DFSClient client) {
+ try {
+ return (Boolean) isClientRunningMethod.invoke(client);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static LeaseManager createLeaseManager() throws NoSuchMethodException {
+ Method beginFileLeaseMethod =
+ DFSClient.class.getDeclaredMethod("beginFileLease", long.class, DFSOutputStream.class);
+ beginFileLeaseMethod.setAccessible(true);
+ Method endFileLeaseMethod = DFSClient.class.getDeclaredMethod("endFileLease", long.class);
+ endFileLeaseMethod.setAccessible(true);
+ return new LeaseManager() {
+
+ @Override
+ public void begin(DFSClient client, long inodeId) {
+ try {
+ beginFileLeaseMethod.invoke(client, inodeId, null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void end(DFSClient client, long inodeId) {
+ try {
+ endFileLeaseMethod.invoke(client, inodeId);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter27()
+ throws NoSuchMethodException {
+ Method getFlagListMethod = PipelineAckProto.class.getMethod("getFlagList");
+ @SuppressWarnings("rawtypes")
+ Class<? extends Enum> ecnClass;
+ try {
+ ecnClass = Class.forName("org.apache.hadoop.hdfs.protocol.datatransfer.PipelineAck$ECN")
+ .asSubclass(Enum.class);
+ } catch (ClassNotFoundException e) {
+ String msg = "Couldn't properly initialize the PipelineAck.ECN class. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ @SuppressWarnings("unchecked")
+ Enum<?> disabledECN = Enum.valueOf(ecnClass, "DISABLED");
+ Method getReplyMethod = PipelineAckProto.class.getMethod("getReply", int.class);
+ Method combineHeaderMethod =
+ PipelineAck.class.getMethod("combineHeader", ecnClass, Status.class);
+ Method getStatusFromHeaderMethod =
+ PipelineAck.class.getMethod("getStatusFromHeader", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ @SuppressWarnings("unchecked")
+ List<Integer> flagList = (List<Integer>) getFlagListMethod.invoke(ack);
+ Integer headerFlag;
+ if (flagList.isEmpty()) {
+ Status reply = (Status) getReplyMethod.invoke(ack, 0);
+ headerFlag = (Integer) combineHeaderMethod.invoke(null, disabledECN, reply);
+ } else {
+ headerFlag = flagList.get(0);
+ }
+ return (Status) getStatusFromHeaderMethod.invoke(null, headerFlag);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter26()
+ throws NoSuchMethodException {
+ Method getStatusMethod = PipelineAckProto.class.getMethod("getStatus", int.class);
+ return new PipelineAckStatusGetter() {
+
+ @Override
+ public Status get(PipelineAckProto ack) {
+ try {
+ return (Status) getStatusMethod.invoke(ack, 0);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static PipelineAckStatusGetter createPipelineAckStatusGetter()
+ throws NoSuchMethodException {
+ try {
+ return createPipelineAckStatusGetter27();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("Can not get expected method " + e.getMessage() +
+ ", this usually because your Hadoop is pre 2.7.0, " +
+ "try the methods in Hadoop 2.6.x instead.");
+ }
+ return createPipelineAckStatusGetter26();
+ }
+
+ private static StorageTypeSetter createStorageTypeSetter() throws NoSuchMethodException {
+ Method setStorageTypeMethod =
+ OpWriteBlockProto.Builder.class.getMethod("setStorageType", StorageTypeProto.class);
+ ImmutableMap.Builder<String, StorageTypeProto> builder = ImmutableMap.builder();
+ for (StorageTypeProto storageTypeProto : StorageTypeProto.values()) {
+ builder.put(storageTypeProto.name(), storageTypeProto);
+ }
+ ImmutableMap<String, StorageTypeProto> name2ProtoEnum = builder.build();
+ return new StorageTypeSetter() {
+
+ @Override
+ public OpWriteBlockProto.Builder set(OpWriteBlockProto.Builder builder, Enum<?> storageType) {
+ Object protoEnum = name2ProtoEnum.get(storageType.name());
+ try {
+ setStorageTypeMethod.invoke(builder, protoEnum);
+ } catch (IllegalAccessException | IllegalArgumentException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ return builder;
+ }
+ };
+ }
+
+ private static BlockAdder createBlockAdder() throws NoSuchMethodException {
+ for (Method method : ClientProtocol.class.getMethods()) {
+ if (method.getName().equals("addBlock")) {
+ Method addBlockMethod = method;
+ Class<?>[] paramTypes = addBlockMethod.getParameterTypes();
+ if (paramTypes[paramTypes.length - 1] == String[].class) {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ } else {
+ return new BlockAdder() {
+
+ @Override
+ public LocatedBlock addBlock(ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludeNodes, long fileId,
+ String[] favoredNodes) throws IOException {
+ try {
+ return (LocatedBlock) addBlockMethod.invoke(namenode, src, clientName, previous,
+ excludeNodes, fileId, favoredNodes, null);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ }
+ throw new NoSuchMethodException("Can not find addBlock method in ClientProtocol");
+ }
+
+ private static PBHelper createPBHelper() throws NoSuchMethodException {
+ Class<?> helperClass;
+ String clazzName = "org.apache.hadoop.hdfs.protocolPB.PBHelperClient";
+ try {
+ helperClass = Class.forName(clazzName);
+ } catch (ClassNotFoundException e) {
+ helperClass = org.apache.hadoop.hdfs.protocolPB.PBHelper.class;
+ LOG.debug("" + clazzName + " not found (Hadoop is pre-2.8.0?); using " +
+ helperClass.toString() + " instead.");
+ }
+ Method convertEBMethod = helperClass.getMethod("convert", ExtendedBlock.class);
+ Method convertTokenMethod = helperClass.getMethod("convert", Token.class);
+ return new PBHelper() {
+
+ @Override
+ public ExtendedBlockProto convert(ExtendedBlock b) {
+ try {
+ return (ExtendedBlockProto) convertEBMethod.invoke(null, b);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public TokenProto convert(Token<?> tok) {
+ try {
+ return (TokenProto) convertTokenMethod.invoke(null, tok);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater28(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ for (Method method : confClass.getMethods()) {
+ if (method.getName().equals("createChecksum")) {
+ Method createChecksumMethod = method;
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client),
+ (Object) null);
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ }
+ throw new NoSuchMethodException("Can not find createChecksum method in DfsClientConf");
+ }
+
+ private static ChecksumCreater createChecksumCreater27(Method getConfMethod, Class<?> confClass)
+ throws NoSuchMethodException {
+ Method createChecksumMethod = confClass.getDeclaredMethod("createChecksum");
+ createChecksumMethod.setAccessible(true);
+ return new ChecksumCreater() {
+
+ @Override
+ public DataChecksum createChecksum(DFSClient client) {
+ try {
+ return (DataChecksum) createChecksumMethod.invoke(getConfMethod.invoke(client));
+ } catch (IllegalAccessException | InvocationTargetException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static ChecksumCreater createChecksumCreater()
+ throws NoSuchMethodException, ClassNotFoundException {
+ Method getConfMethod = DFSClient.class.getMethod("getConf");
+ try {
+ return createChecksumCreater28(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.client.impl.DfsClientConf"));
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No DfsClientConf class found, should be hadoop 2.7-", e);
+ }
+ return createChecksumCreater27(getConfMethod,
+ Class.forName("org.apache.hadoop.hdfs.DFSClient$Conf"));
+ }
+
+ private static FileCreator createFileCreator3() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class, String.class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions, null);
+ };
+ }
+
+ private static FileCreator createFileCreator2() throws NoSuchMethodException {
+ Method createMethod = ClientProtocol.class.getMethod("create", String.class, FsPermission.class,
+ String.class, EnumSetWritable.class, boolean.class, short.class, long.class,
+ CryptoProtocolVersion[].class);
+
+ return (instance, src, masked, clientName, flag, createParent, replication, blockSize,
+ supportedVersions) -> {
+ return (HdfsFileStatus) createMethod.invoke(instance, src, masked, clientName, flag,
+ createParent, replication, blockSize, supportedVersions);
+ };
+ }
+
+ private static FileCreator createFileCreator() throws NoSuchMethodException {
+ try {
+ return createFileCreator3();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("ClientProtocol::create wrong number of arguments, should be hadoop 2.x");
+ }
+ return createFileCreator2();
+ }
+
+ // cancel the processing if DFSClient is already closed.
+ static final class CancelOnClose implements CancelableProgressable {
+
+ private final DFSClient client;
+
+ public CancelOnClose(DFSClient client) {
+ this.client = client;
+ }
+
+ @Override
+ public boolean progress() {
+ return DFS_CLIENT_ADAPTOR.isClientRunning(client);
+ }
+ }
+
+ static {
+ try {
+ PIPELINE_ACK_STATUS_GETTER = createPipelineAckStatusGetter();
+ STORAGE_TYPE_SETTER = createStorageTypeSetter();
+ BLOCK_ADDER = createBlockAdder();
+ LEASE_MANAGER = createLeaseManager();
+ DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
+ PB_HELPER = createPBHelper();
+ CHECKSUM_CREATER = createChecksumCreater();
+ FILE_CREATOR = createFileCreator();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please " +
+ "update your WAL Provider to not make use of the 'asyncfs' provider. See " +
+ "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ static void beginFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.begin(client, inodeId);
+ }
+
+ static void endFileLease(DFSClient client, long inodeId) {
+ LEASE_MANAGER.end(client, inodeId);
+ }
+
+ static DataChecksum createChecksum(DFSClient client) {
+ return CHECKSUM_CREATER.createChecksum(client);
+ }
+
+ static Status getStatus(PipelineAckProto ack) {
+ return PIPELINE_ACK_STATUS_GETTER.get(ack);
+ }
+
+ private static void processWriteBlockResponse(Channel channel, DatanodeInfo dnInfo,
+ Promise<Channel> promise, int timeoutMs) {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(BlockOpResponseProto.getDefaultInstance()),
+ new SimpleChannelInboundHandler<BlockOpResponseProto>() {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, BlockOpResponseProto resp)
+ throws Exception {
+ Status pipelineStatus = resp.getStatus();
+ if (PipelineAck.isRestartOOBStatus(pipelineStatus)) {
+ throw new IOException("datanode " + dnInfo + " is restarting");
+ }
+ String logInfo = "ack with firstBadLink as " + resp.getFirstBadLink();
+ if (resp.getStatus() != Status.SUCCESS) {
+ if (resp.getStatus() == Status.ERROR_ACCESS_TOKEN) {
+ throw new InvalidBlockTokenException("Got access token error" + ", status message " +
+ resp.getMessage() + ", " + logInfo);
+ } else {
+ throw new IOException("Got error" + ", status=" + resp.getStatus().name() +
+ ", status message " + resp.getMessage() + ", " + logInfo);
+ }
+ }
+ // success
+ ChannelPipeline p = ctx.pipeline();
+ for (ChannelHandler handler; (handler = p.removeLast()) != null;) {
+ // do not remove all handlers because we may have wrap or unwrap handlers at the header
+ // of pipeline.
+ if (handler instanceof IdleStateHandler) {
+ break;
+ }
+ }
+ // Disable auto read here. Enable it after we setup the streaming pipeline in
+ // FanOutOneBLockAsyncDFSOutput.
+ ctx.channel().config().setAutoRead(false);
+ promise.trySuccess(ctx.channel());
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ promise.tryFailure(new IOException("connection to " + dnInfo + " is closed"));
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise
+ .tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+ });
+ }
+
+ private static void requestWriteBlock(Channel channel, Enum<?> storageType,
+ OpWriteBlockProto.Builder writeBlockProtoBuilder) throws IOException {
+ OpWriteBlockProto proto = STORAGE_TYPE_SETTER.set(writeBlockProtoBuilder, storageType).build();
+ int protoLen = proto.getSerializedSize();
+ ByteBuf buffer =
+ channel.alloc().buffer(3 + CodedOutputStream.computeRawVarint32Size(protoLen) + protoLen);
+ buffer.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
+ buffer.writeByte(Op.WRITE_BLOCK.code);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buffer));
+ channel.writeAndFlush(buffer);
+ }
+
+ private static void initialize(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ Enum<?> storageType, OpWriteBlockProto.Builder writeBlockProtoBuilder, int timeoutMs,
+ DFSClient client, Token<BlockTokenIdentifier> accessToken, Promise<Channel> promise)
+ throws IOException {
+ Promise<Void> saslPromise = channel.eventLoop().newPromise();
+ trySaslNegotiate(conf, channel, dnInfo, timeoutMs, client, accessToken, saslPromise);
+ saslPromise.addListener(new FutureListener<Void>() {
+
+ @Override
+ public void operationComplete(Future<Void> future) throws Exception {
+ if (future.isSuccess()) {
+ // setup response processing pipeline first, then send request.
+ processWriteBlockResponse(channel, dnInfo, promise, timeoutMs);
+ requestWriteBlock(channel, storageType, writeBlockProtoBuilder);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSClient client,
+ String clientName, LocatedBlock locatedBlock, long maxBytesRcvd, long latestGS,
+ BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
+ Class<? extends Channel> channelClass) {
+ Enum<?>[] storageTypes = locatedBlock.getStorageTypes();
+ DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
+ boolean connectToDnViaHostname =
+ conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
+ int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
+ ExtendedBlock blockCopy = new ExtendedBlock(locatedBlock.getBlock());
+ blockCopy.setNumBytes(locatedBlock.getBlockSize());
+ ClientOperationHeaderProto header = ClientOperationHeaderProto.newBuilder()
+ .setBaseHeader(BaseHeaderProto.newBuilder().setBlock(PB_HELPER.convert(blockCopy))
+ .setToken(PB_HELPER.convert(locatedBlock.getBlockToken())))
+ .setClientName(clientName).build();
+ ChecksumProto checksumProto = DataTransferProtoUtil.toProto(summer);
+ OpWriteBlockProto.Builder writeBlockProtoBuilder = OpWriteBlockProto.newBuilder()
+ .setHeader(header).setStage(OpWriteBlockProto.BlockConstructionStage.valueOf(stage.name()))
+ .setPipelineSize(1).setMinBytesRcvd(locatedBlock.getBlock().getNumBytes())
+ .setMaxBytesRcvd(maxBytesRcvd).setLatestGenerationStamp(latestGS)
+ .setRequestedChecksum(checksumProto)
+ .setCachingStrategy(CachingStrategyProto.newBuilder().setDropBehind(true).build());
+ List<Future<Channel>> futureList = new ArrayList<>(datanodeInfos.length);
+ for (int i = 0; i < datanodeInfos.length; i++) {
+ DatanodeInfo dnInfo = datanodeInfos[i];
+ Enum<?> storageType = storageTypes[i];
+ Promise<Channel> promise = eventLoopGroup.next().newPromise();
+ futureList.add(promise);
+ String dnAddr = dnInfo.getXferAddr(connectToDnViaHostname);
+ new Bootstrap().group(eventLoopGroup).channel(channelClass)
+ .option(CONNECT_TIMEOUT_MILLIS, timeoutMs).handler(new ChannelInitializer<Channel>() {
+
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ // we need to get the remote address of the channel so we can only move on after
+ // channel connected. Leave an empty implementation here because netty does not allow
+ // a null handler.
+ }
+ }).connect(NetUtils.createSocketAddr(dnAddr)).addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ initialize(conf, future.channel(), dnInfo, storageType, writeBlockProtoBuilder,
+ timeoutMs, client, locatedBlock.getBlockToken(), promise);
+ } else {
+ promise.tryFailure(future.cause());
+ }
+ }
+ });
+ }
+ return futureList;
+ }
+
+ /**
+ * Exception other than RemoteException thrown when calling create on namenode
+ */
+ public static class NameNodeException extends IOException {
+
+ private static final long serialVersionUID = 3143237406477095390L;
+
+ public NameNodeException(Throwable cause) {
+ super(cause);
+ }
+ }
+
+ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ Configuration conf = dfs.getConf();
+ FSUtils fsUtils = FSUtils.getInstance(dfs, conf);
+ DFSClient client = dfs.getClient();
+ String clientName = client.getClientName();
+ ClientProtocol namenode = client.getNamenode();
+ int createMaxRetries = conf.getInt(ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES,
+ DEFAULT_ASYNC_DFS_OUTPUT_CREATE_MAX_RETRIES);
+ DatanodeInfo[] excludesNodes = EMPTY_DN_ARRAY;
+ for (int retry = 0;; retry++) {
+ HdfsFileStatus stat;
+ try {
+ stat = FILE_CREATOR.create(namenode, src,
+ FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
+ new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
+ createParent, replication, blockSize, CryptoProtocolVersion.supported());
+ } catch (Exception e) {
+ if (e instanceof RemoteException) {
+ throw (RemoteException) e;
+ } else {
+ throw new NameNodeException(e);
+ }
+ }
+ beginFileLease(client, stat.getFileId());
+ boolean succ = false;
+ LocatedBlock locatedBlock = null;
+ List<Future<Channel>> futureList = null;
+ try {
+ DataChecksum summer = createChecksum(client);
+ locatedBlock = BLOCK_ADDER.addBlock(namenode, src, client.getClientName(), null,
+ excludesNodes, stat.getFileId(), null);
+ List<Channel> datanodeList = new ArrayList<>();
+ futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
+ PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
+ for (int i = 0, n = futureList.size(); i < n; i++) {
+ try {
+ datanodeList.add(futureList.get(i).syncUninterruptibly().getNow());
+ } catch (Exception e) {
+ // exclude the broken DN next time
+ excludesNodes = ArrayUtils.add(excludesNodes, locatedBlock.getLocations()[i]);
+ throw e;
+ }
+ }
+ Encryptor encryptor = createEncryptor(conf, stat, client);
+ FanOutOneBlockAsyncDFSOutput output =
+ new FanOutOneBlockAsyncDFSOutput(conf, fsUtils, dfs, client, namenode, clientName, src,
+ stat.getFileId(), locatedBlock, encryptor, datanodeList, summer, ALLOC);
+ succ = true;
+ return output;
+ } catch (RemoteException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (shouldRetryCreate(e)) {
+ if (retry >= createMaxRetries) {
+ throw e.unwrapRemoteException();
+ }
+ } else {
+ throw e.unwrapRemoteException();
+ }
+ } catch (IOException e) {
+ LOG.warn("create fan-out dfs output {} failed, retry = {}", src, retry, e);
+ if (retry >= createMaxRetries) {
+ throw e;
+ }
+ // overwrite the old broken file.
+ overwrite = true;
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException ie) {
+ throw new InterruptedIOException();
+ }
+ } finally {
+ if (!succ) {
+ if (futureList != null) {
+ for (Future<Channel> f : futureList) {
+ f.addListener(new FutureListener<Channel>() {
+
+ @Override
+ public void operationComplete(Future<Channel> future) throws Exception {
+ if (future.isSuccess()) {
+ future.getNow().close();
+ }
+ }
+ });
+ }
+ }
+ endFileLease(client, stat.getFileId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Create a {@link FanOutOneBlockAsyncDFSOutput}. The method maybe blocked so do not call it
+ * inside an {@link EventLoop}.
+ */
+ public static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, Path f,
+ boolean overwrite, boolean createParent, short replication, long blockSize,
+ EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
+ return new FileSystemLinkResolver<FanOutOneBlockAsyncDFSOutput>() {
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput doCall(Path p)
+ throws IOException, UnresolvedLinkException {
+ return createOutput(dfs, p.toUri().getPath(), overwrite, createParent, replication,
+ blockSize, eventLoopGroup, channelClass);
+ }
+
+ @Override
+ public FanOutOneBlockAsyncDFSOutput next(FileSystem fs, Path p) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }.resolve(dfs, f);
+ }
+
+ public static boolean shouldRetryCreate(RemoteException e) {
+ // RetryStartFileException is introduced in HDFS 2.6+, so here we can only use the class name.
+ // For exceptions other than this, we just throw it out. This is same with
+ // DFSOutputStream.newStreamForCreate.
+ return e.getClassName().endsWith("RetryStartFileException");
+ }
+
+ static void completeFile(DFSClient client, ClientProtocol namenode, String src, String clientName,
+ ExtendedBlock block, long fileId) {
+ for (int retry = 0;; retry++) {
+ try {
+ if (namenode.complete(src, clientName, block, fileId)) {
+ endFileLease(client, fileId);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " not finished, retry = " + retry);
+ }
+ } catch (RemoteException e) {
+ IOException ioe = e.unwrapRemoteException();
+ if (ioe instanceof LeaseExpiredException) {
+ LOG.warn("lease for file " + src + " is expired, give up", e);
+ return;
+ } else {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ } catch (Exception e) {
+ LOG.warn("complete file " + src + " failed, retry = " + retry, e);
+ }
+ sleepIgnoreInterrupt(retry);
+ }
+ }
+
+ static void sleepIgnoreInterrupt(int retry) {
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException e) {
+ }
+ }
+}
diff --git a/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
new file mode 100644
index 0000000..08acb13
--- /dev/null
+++ b/metron-platform/metron-hbase/metron-hbase-common/src/test/java/org/apache/hadoop/hbase/io/asyncfs/FanOutOneBlockAsyncDFSOutputSaslHelper.java
@@ -0,0 +1,783 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.asyncfs;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY;
+import static org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleState.READER_IDLE;
+
+import com.google.protobuf.ByteString;
+import com.google.protobuf.CodedOutputStream;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.security.GeneralSecurityException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.CallbackHandler;
+import javax.security.auth.callback.NameCallback;
+import javax.security.auth.callback.PasswordCallback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.RealmCallback;
+import javax.security.sasl.RealmChoiceCallback;
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CipherOption;
+import org.apache.hadoop.crypto.CipherSuite;
+import org.apache.hadoop.crypto.CryptoCodec;
+import org.apache.hadoop.crypto.Decryptor;
+import org.apache.hadoop.crypto.Encryptor;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProvider.KeyVersion;
+import org.apache.hadoop.fs.FileEncryptionInfo;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
+import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
+import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DataTransferEncryptorMessageProto.DataTransferEncryptorStatus;
+import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.security.SaslPropertiesResolver;
+import org.apache.hadoop.security.SaslRpcServer.QualityOfProtection;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Charsets;
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableSet;
+import org.apache.hbase.thirdparty.com.google.common.collect.Maps;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream;
+import org.apache.hbase.thirdparty.io.netty.buffer.CompositeByteBuf;
+import org.apache.hbase.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelDuplexHandler;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelOutboundHandlerAdapter;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline;
+import org.apache.hbase.thirdparty.io.netty.channel.ChannelPromise;
+import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.MessageToByteEncoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateEvent;
+import org.apache.hbase.thirdparty.io.netty.handler.timeout.IdleStateHandler;
+import org.apache.hbase.thirdparty.io.netty.util.concurrent.Promise;
+
+/**
+ * NOTE - this class is copied from HBase to get around https://issues.apache.org/jira/browse/HBASE-22394
+ *
+ * Helper class for adding sasl support for {@link FanOutOneBlockAsyncDFSOutput}.
+ */
+@InterfaceAudience.Private
+public final class FanOutOneBlockAsyncDFSOutputSaslHelper {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FanOutOneBlockAsyncDFSOutputSaslHelper.class);
+
+ private FanOutOneBlockAsyncDFSOutputSaslHelper() {
+ }
+
+ private static final String SERVER_NAME = "0";
+ private static final String PROTOCOL = "hdfs";
+ private static final String MECHANISM = "DIGEST-MD5";
+ private static final int SASL_TRANSFER_MAGIC_NUMBER = 0xDEADBEEF;
+ private static final String NAME_DELIMITER = " ";
+
+ private interface SaslAdaptor {
+
+ TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient);
+
+ SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient);
+
+ AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient);
+ }
+
+ private static final SaslAdaptor SASL_ADAPTOR;
+
+ private interface TransparentCryptoHelper {
+
+ Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo, DFSClient client)
+ throws IOException;
+ }
+
+ private static final TransparentCryptoHelper TRANSPARENT_CRYPTO_HELPER;
+
+ private static SaslAdaptor createSaslAdaptor()
+ throws NoSuchFieldException, NoSuchMethodException {
+ Field saslPropsResolverField =
+ SaslDataTransferClient.class.getDeclaredField("saslPropsResolver");
+ saslPropsResolverField.setAccessible(true);
+ Field trustedChannelResolverField =
+ SaslDataTransferClient.class.getDeclaredField("trustedChannelResolver");
+ trustedChannelResolverField.setAccessible(true);
+ Field fallbackToSimpleAuthField =
+ SaslDataTransferClient.class.getDeclaredField("fallbackToSimpleAuth");
+ fallbackToSimpleAuthField.setAccessible(true);
+ return new SaslAdaptor() {
+
+ @Override
+ public TrustedChannelResolver getTrustedChannelResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (TrustedChannelResolver) trustedChannelResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public SaslPropertiesResolver getSaslPropsResolver(SaslDataTransferClient saslClient) {
+ try {
+ return (SaslPropertiesResolver) saslPropsResolverField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public AtomicBoolean getFallbackToSimpleAuth(SaslDataTransferClient saslClient) {
+ try {
+ return (AtomicBoolean) fallbackToSimpleAuthField.get(saslClient);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithoutHDFS12396()
+ throws NoSuchMethodException {
+ Method decryptEncryptedDataEncryptionKeyMethod = DFSClient.class
+ .getDeclaredMethod("decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey =
+ (KeyVersion) decryptEncryptedDataEncryptionKeyMethod.invoke(client, feInfo);
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelperWithHDFS12396()
+ throws ClassNotFoundException, NoSuchMethodException {
+ Class<?> hdfsKMSUtilCls = Class.forName("org.apache.hadoop.hdfs.HdfsKMSUtil");
+ Method decryptEncryptedDataEncryptionKeyMethod = hdfsKMSUtilCls.getDeclaredMethod(
+ "decryptEncryptedDataEncryptionKey", FileEncryptionInfo.class, KeyProvider.class);
+ decryptEncryptedDataEncryptionKeyMethod.setAccessible(true);
+ return new TransparentCryptoHelper() {
+
+ @Override
+ public Encryptor createEncryptor(Configuration conf, FileEncryptionInfo feInfo,
+ DFSClient client) throws IOException {
+ try {
+ KeyVersion decryptedKey = (KeyVersion) decryptEncryptedDataEncryptionKeyMethod
+ .invoke(null, feInfo, client.getKeyProvider());
+ CryptoCodec cryptoCodec = CryptoCodec.getInstance(conf, feInfo.getCipherSuite());
+ Encryptor encryptor = cryptoCodec.createEncryptor();
+ encryptor.init(decryptedKey.getMaterial(), feInfo.getIV());
+ return encryptor;
+ } catch (InvocationTargetException e) {
+ Throwables.propagateIfPossible(e.getTargetException(), IOException.class);
+ throw new RuntimeException(e.getTargetException());
+ } catch (GeneralSecurityException e) {
+ throw new IOException(e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+
+ private static TransparentCryptoHelper createTransparentCryptoHelper()
+ throws NoSuchMethodException, ClassNotFoundException {
+ try {
+ return createTransparentCryptoHelperWithoutHDFS12396();
+ } catch (NoSuchMethodException e) {
+ LOG.debug("No decryptEncryptedDataEncryptionKey method in DFSClient," +
+ " should be hadoop version with HDFS-12396", e);
+ }
+ return createTransparentCryptoHelperWithHDFS12396();
+ }
+
+ static {
+ try {
+ SASL_ADAPTOR = createSaslAdaptor();
+ TRANSPARENT_CRYPTO_HELPER = createTransparentCryptoHelper();
+ } catch (Exception e) {
+ String msg = "Couldn't properly initialize access to HDFS internals. Please "
+ + "update your WAL Provider to not make use of the 'asyncfs' provider. See "
+ + "HBASE-16110 for more information.";
+ LOG.error(msg, e);
+ throw new Error(msg, e);
+ }
+ }
+
+ /**
+ * Sets user name and password when asked by the client-side SASL object.
+ */
+ private static final class SaslClientCallbackHandler implements CallbackHandler {
+
+ private final char[] password;
+ private final String userName;
+
+ /**
+ * Creates a new SaslClientCallbackHandler.
+ * @param userName SASL user name
+ * @param password SASL password
+ */
+ public SaslClientCallbackHandler(String userName, char[] password) {
+ this.password = password;
+ this.userName = userName;
+ }
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+ NameCallback nc = null;
+ PasswordCallback pc = null;
+ RealmCallback rc = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof RealmChoiceCallback) {
+ continue;
+ } else if (callback instanceof NameCallback) {
+ nc = (NameCallback) callback;
+ } else if (callback instanceof PasswordCallback) {
+ pc = (PasswordCallback) callback;
+ } else if (callback instanceof RealmCallback) {
+ rc = (RealmCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback, "Unrecognized SASL client callback");
+ }
+ }
+ if (nc != null) {
+ nc.setName(userName);
+ }
+ if (pc != null) {
+ pc.setPassword(password);
+ }
+ if (rc != null) {
+ rc.setText(rc.getDefaultText());
+ }
+ }
+ }
+
+ private static final class SaslNegotiateHandler extends ChannelDuplexHandler {
+
+ private final Configuration conf;
+
+ private final Map<String, String> saslProps;
+
+ private final SaslClient saslClient;
+
+ private final int timeoutMs;
+
+ private final Promise<Void> promise;
+
+ private final DFSClient dfsClient;
+
+ private int step = 0;
+
+ public SaslNegotiateHandler(Configuration conf, String username, char[] password,
+ Map<String, String> saslProps, int timeoutMs, Promise<Void> promise,
+ DFSClient dfsClient) throws SaslException {
+ this.conf = conf;
+ this.saslProps = saslProps;
+ this.saslClient = Sasl.createSaslClient(new String[] { MECHANISM }, username, PROTOCOL,
+ SERVER_NAME, saslProps, new SaslClientCallbackHandler(username, password));
+ this.timeoutMs = timeoutMs;
+ this.promise = promise;
+ this.dfsClient = dfsClient;
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload) throws IOException {
+ sendSaslMessage(ctx, payload, null);
+ }
+
+ private List<CipherOption> getCipherOptions() throws IOException {
+ // Negotiate cipher suites if configured. Currently, the only supported
+ // cipher suite is AES/CTR/NoPadding, but the protocol allows multiple
+ // values for future expansion.
+ String cipherSuites = conf.get(DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY);
+ if (StringUtils.isBlank(cipherSuites)) {
+ return null;
+ }
+ if (!cipherSuites.equals(CipherSuite.AES_CTR_NOPADDING.getName())) {
+ throw new IOException(String.format("Invalid cipher suite, %s=%s",
+ DFS_ENCRYPT_DATA_TRANSFER_CIPHER_SUITES_KEY, cipherSuites));
+ }
+ return Collections.singletonList(new CipherOption(CipherSuite.AES_CTR_NOPADDING));
+ }
+
+ private void sendSaslMessage(ChannelHandlerContext ctx, byte[] payload,
+ List<CipherOption> options) throws IOException {
+ DataTransferEncryptorMessageProto.Builder builder =
+ DataTransferEncryptorMessageProto.newBuilder();
+ builder.setStatus(DataTransferEncryptorStatus.SUCCESS);
+ if (payload != null) {
+ // Was ByteStringer; fix w/o using ByteStringer. Its in hbase-protocol
+ // and we want to keep that out of hbase-server.
+ builder.setPayload(ByteString.copyFrom(payload));
+ }
+ if (options != null) {
+ builder.addAllCipherOption(PBHelperClient.convertCipherOptions(options));
+ }
+ DataTransferEncryptorMessageProto proto = builder.build();
+ int size = proto.getSerializedSize();
+ size += CodedOutputStream.computeRawVarint32Size(size);
+ ByteBuf buf = ctx.alloc().buffer(size);
+ proto.writeDelimitedTo(new ByteBufOutputStream(buf));
+ ctx.write(buf);
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ ctx.write(ctx.alloc().buffer(4).writeInt(SASL_TRANSFER_MAGIC_NUMBER));
+ sendSaslMessage(ctx, new byte[0]);
+ ctx.flush();
+ step++;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ private void check(DataTransferEncryptorMessageProto proto) throws IOException {
+ if (proto.getStatus() == DataTransferEncryptorStatus.ERROR_UNKNOWN_KEY) {
+ dfsClient.clearDataEncryptionKey();
+ throw new InvalidEncryptionKeyException(proto.getMessage());
+ } else if (proto.getStatus() == DataTransferEncryptorStatus.ERROR) {
+ throw new IOException(proto.getMessage());
+ }
+ }
+
+ private String getNegotiatedQop() {
+ return (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ }
+
+ private boolean isNegotiatedQopPrivacy() {
+ String qop = getNegotiatedQop();
+ return qop != null && "auth-conf".equalsIgnoreCase(qop);
+ }
+
+ private boolean requestedQopContainsPrivacy() {
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ return requestedQop.contains("auth-conf");
+ }
+
+ private void checkSaslComplete() throws IOException {
+ if (!saslClient.isComplete()) {
+ throw new IOException("Failed to complete SASL handshake");
+ }
+ Set<String> requestedQop =
+ ImmutableSet.copyOf(Arrays.asList(saslProps.get(Sasl.QOP).split(",")));
+ String negotiatedQop = getNegotiatedQop();
+ LOG.debug(
+ "Verifying QOP, requested QOP = " + requestedQop + ", negotiated QOP = " + negotiatedQop);
+ if (!requestedQop.contains(negotiatedQop)) {
+ throw new IOException(String.format("SASL handshake completed, but "
+ + "channel does not have acceptable quality of protection, "
+ + "requested = %s, negotiated = %s",
+ requestedQop, negotiatedQop));
+ }
+ }
+
+ private boolean useWrap() {
+ String qop = (String) saslClient.getNegotiatedProperty(Sasl.QOP);
+ return qop != null && !"auth".equalsIgnoreCase(qop);
+ }
+
+ private CipherOption unwrap(CipherOption option, SaslClient saslClient) throws IOException {
+ byte[] inKey = option.getInKey();
+ if (inKey != null) {
+ inKey = saslClient.unwrap(inKey, 0, inKey.length);
+ }
+ byte[] outKey = option.getOutKey();
+ if (outKey != null) {
+ outKey = saslClient.unwrap(outKey, 0, outKey.length);
+ }
+ return new CipherOption(option.getCipherSuite(), inKey, option.getInIv(), outKey,
+ option.getOutIv());
+ }
+
+ private CipherOption getCipherOption(DataTransferEncryptorMessageProto proto,
+ boolean isNegotiatedQopPrivacy, SaslClient saslClient) throws IOException {
+ List<CipherOption> cipherOptions =
+ PBHelperClient.convertCipherOptionProtos(proto.getCipherOptionList());
+ if (cipherOptions == null || cipherOptions.isEmpty()) {
+ return null;
+ }
+ CipherOption cipherOption = cipherOptions.get(0);
+ return isNegotiatedQopPrivacy ? unwrap(cipherOption, saslClient) : cipherOption;
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof DataTransferEncryptorMessageProto) {
+ DataTransferEncryptorMessageProto proto = (DataTransferEncryptorMessageProto) msg;
+ check(proto);
+ byte[] challenge = proto.getPayload().toByteArray();
+ byte[] response = saslClient.evaluateChallenge(challenge);
+ switch (step) {
+ case 1: {
+ List<CipherOption> cipherOptions = null;
+ if (requestedQopContainsPrivacy()) {
+ cipherOptions = getCipherOptions();
+ }
+ sendSaslMessage(ctx, response, cipherOptions);
+ ctx.flush();
+ step++;
+ break;
+ }
+ case 2: {
+ assert response == null;
+ checkSaslComplete();
+ CipherOption cipherOption =
+ getCipherOption(proto, isNegotiatedQopPrivacy(), saslClient);
+ ChannelPipeline p = ctx.pipeline();
+ while (p.first() != null) {
+ p.removeFirst();
+ }
+ if (cipherOption != null) {
+ CryptoCodec codec = CryptoCodec.getInstance(conf, cipherOption.getCipherSuite());
+ p.addLast(new EncryptHandler(codec, cipherOption.getInKey(), cipherOption.getInIv()),
+ new DecryptHandler(codec, cipherOption.getOutKey(), cipherOption.getOutIv()));
+ } else {
+ if (useWrap()) {
+ p.addLast(new SaslWrapHandler(saslClient),
+ new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4),
+ new SaslUnwrapHandler(saslClient));
+ }
+ }
+ promise.trySuccess(null);
+ break;
+ }
+ default:
+ throw new IllegalArgumentException("Unrecognized negotiation step: " + step);
+ }
+ } else {
+ ctx.fireChannelRead(msg);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ promise.tryFailure(cause);
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent && ((IdleStateEvent) evt).state() == READER_IDLE) {
+ promise.tryFailure(new IOException("Timeout(" + timeoutMs + "ms) waiting for response"));
+ } else {
+ super.userEventTriggered(ctx, evt);
+ }
+ }
+ }
+
+ private static final class SaslUnwrapHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final SaslClient saslClient;
+
+ public SaslUnwrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ saslClient.dispose();
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ msg.skipBytes(4);
+ byte[] b = new byte[msg.readableBytes()];
+ msg.readBytes(b);
+ ctx.fireChannelRead(Unpooled.wrappedBuffer(saslClient.unwrap(b, 0, b.length)));
+ }
+ }
+
+ private static final class SaslWrapHandler extends ChannelOutboundHandlerAdapter {
+
+ private final SaslClient saslClient;
+
+ private CompositeByteBuf cBuf;
+
+ public SaslWrapHandler(SaslClient saslClient) {
+ this.saslClient = saslClient;
+ }
+
+ @Override
+ public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+ cBuf = new CompositeByteBuf(ctx.alloc(), false, Integer.MAX_VALUE);
+ }
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
+ throws Exception {
+ if (msg instanceof ByteBuf) {
+ ByteBuf buf = (ByteBuf) msg;
+ cBuf.addComponent(buf);
+ cBuf.writerIndex(cBuf.writerIndex() + buf.readableBytes());
+ } else {
+ ctx.write(msg);
+ }
+ }
+
+ @Override
+ public void flush(ChannelHandlerContext ctx) throws Exception {
+ if (cBuf.isReadable()) {
+ byte[] b = new byte[cBuf.readableBytes()];
+ cBuf.readBytes(b);
+ cBuf.discardReadComponents();
+ byte[] wrapped = saslClient.wrap(b, 0, b.length);
+ ByteBuf buf = ctx.alloc().ioBuffer(4 + wrapped.length);
+ buf.writeInt(wrapped.length);
+ buf.writeBytes(wrapped);
+ ctx.write(buf);
+ }
+ ctx.flush();
+ }
+
+ @Override
+ public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+ cBuf.release();
+ cBuf = null;
+ }
+ }
+
+ private static final class DecryptHandler extends SimpleChannelInboundHandler<ByteBuf> {
+
+ private final Decryptor decryptor;
+
+ public DecryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.decryptor = codec.createDecryptor();
+ this.decryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuf outBuf = ctx.alloc().directBuffer(inBuf.readableBytes());
+ ByteBuffer outBuffer = outBuf.nioBuffer(0, inBuf.readableBytes());
+ decryptor.decrypt(inBuffer, outBuffer);
+ outBuf.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ ctx.fireChannelRead(outBuf);
+ }
+ }
+
+ private static final class EncryptHandler extends MessageToByteEncoder<ByteBuf> {
+
+ private final Encryptor encryptor;
+
+ public EncryptHandler(CryptoCodec codec, byte[] key, byte[] iv)
+ throws GeneralSecurityException, IOException {
+ this.encryptor = codec.createEncryptor();
+ this.encryptor.init(key, Arrays.copyOf(iv, iv.length));
+ }
+
+ @Override
+ protected ByteBuf allocateBuffer(ChannelHandlerContext ctx, ByteBuf msg, boolean preferDirect)
+ throws Exception {
+ if (preferDirect) {
+ return ctx.alloc().directBuffer(msg.readableBytes());
+ } else {
+ return ctx.alloc().buffer(msg.readableBytes());
+ }
+ }
+
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuf msg, ByteBuf out) throws Exception {
+ ByteBuf inBuf;
+ boolean release = false;
+ if (msg.nioBufferCount() == 1) {
+ inBuf = msg;
+ } else {
+ inBuf = ctx.alloc().directBuffer(msg.readableBytes());
+ msg.readBytes(inBuf);
+ release = true;
+ }
+ ByteBuffer inBuffer = inBuf.nioBuffer();
+ ByteBuffer outBuffer = out.nioBuffer(0, inBuf.readableBytes());
+ encryptor.encrypt(inBuffer, outBuffer);
+ out.writerIndex(inBuf.readableBytes());
+ if (release) {
+ inBuf.release();
+ }
+ }
+ }
+
+ private static String getUserNameFromEncryptionKey(DataEncryptionKey encryptionKey) {
+ return encryptionKey.keyId + NAME_DELIMITER + encryptionKey.blockPoolId + NAME_DELIMITER
+ + new String(Base64.encodeBase64(encryptionKey.nonce, false), Charsets.UTF_8);
+ }
+
+ private static char[] encryptionKeyToPassword(byte[] encryptionKey) {
+ return new String(Base64.encodeBase64(encryptionKey, false), Charsets.UTF_8).toCharArray();
+ }
+
+ private static String buildUsername(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getIdentifier(), false), Charsets.UTF_8);
+ }
+
+ private static char[] buildClientPassword(Token<BlockTokenIdentifier> blockToken) {
+ return new String(Base64.encodeBase64(blockToken.getPassword(), false), Charsets.UTF_8)
+ .toCharArray();
+ }
+
+ private static Map<String, String> createSaslPropertiesForEncryption(String encryptionAlgorithm) {
+ Map<String, String> saslProps = Maps.newHashMapWithExpectedSize(3);
+ saslProps.put(Sasl.QOP, QualityOfProtection.PRIVACY.getSaslQop());
+ saslProps.put(Sasl.SERVER_AUTH, "true");
+ saslProps.put("com.sun.security.sasl.digest.cipher", encryptionAlgorithm);
+ return saslProps;
+ }
+
+ private static void doSaslNegotiation(Configuration conf, Channel channel, int timeoutMs,
+ String username, char[] password, Map<String, String> saslProps, Promise<Void> saslPromise,
+ DFSClient dfsClient) {
+ try {
+ channel.pipeline().addLast(new IdleStateHandler(timeoutMs, 0, 0, TimeUnit.MILLISECONDS),
+ new ProtobufVarint32FrameDecoder(),
+ new ProtobufDecoder(DataTransferEncryptorMessageProto.getDefaultInstance()),
+ new SaslNegotiateHandler(conf, username, password, saslProps, timeoutMs, saslPromise,
+ dfsClient));
+ } catch (SaslException e) {
+ saslPromise.tryFailure(e);
+ }
+ }
+
+ static void trySaslNegotiate(Configuration conf, Channel channel, DatanodeInfo dnInfo,
+ int timeoutMs, DFSClient client, Token<BlockTokenIdentifier> accessToken,
+ Promise<Void> saslPromise) throws IOException {
+ SaslDataTransferClient saslClient = client.getSaslDataTransferClient();
+ SaslPropertiesResolver saslPropsResolver = SASL_ADAPTOR.getSaslPropsResolver(saslClient);
+ TrustedChannelResolver trustedChannelResolver =
+ SASL_ADAPTOR.getTrustedChannelResolver(saslClient);
+ AtomicBoolean fallbackToSimpleAuth = SASL_ADAPTOR.getFallbackToSimpleAuth(saslClient);
+ InetAddress addr = ((InetSocketAddress) channel.remoteAddress()).getAddress();
+ if (trustedChannelResolver.isTrusted() || trustedChannelResolver.isTrusted(addr)) {
+ saslPromise.trySuccess(null);
+ return;
+ }
+ DataEncryptionKey encryptionKey = client.newDataEncryptionKey();
+ if (encryptionKey != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing encrypted handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, getUserNameFromEncryptionKey(encryptionKey),
+ encryptionKeyToPassword(encryptionKey.encryptionKey),
+ createSaslPropertiesForEncryption(encryptionKey.encryptionAlgorithm), saslPromise,
+ client);
+ } else if (!UserGroupInformation.isSecurityEnabled()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in unsecured configuration for addr = " + addr
+ + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (dnInfo.getXferPort() < 1024) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "privileged port for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (fallbackToSimpleAuth != null && fallbackToSimpleAuth.get()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with "
+ + "unsecured cluster for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ } else if (saslPropsResolver != null) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "SASL client doing general handshake for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ doSaslNegotiation(conf, channel, timeoutMs, buildUsername(accessToken),
+ buildClientPassword(accessToken), saslPropsResolver.getClientProperties(addr), saslPromise,
+ client);
+ } else {
+ // It's a secured cluster using non-privileged ports, but no SASL. The only way this can
+ // happen is if the DataNode has ignore.secure.ports.for.testing configured, so this is a rare
+ // edge case.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("SASL client skipping handshake in secured configuration with no SASL "
+ + "protection configured for addr = " + addr + ", datanodeId = " + dnInfo);
+ }
+ saslPromise.trySuccess(null);
+ }
+ }
+
+ static Encryptor createEncryptor(Configuration conf, HdfsFileStatus stat, DFSClient client)
+ throws IOException {
+ FileEncryptionInfo feInfo = stat.getFileEncryptionInfo();
+ if (feInfo == null) {
+ return null;
+ }
+ return TRANSPARENT_CRYPTO_HELPER.createEncryptor(conf, feInfo, client);
+ }
+}
diff --git a/metron-platform/metron-indexing/metron-indexing-common/pom.xml b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
index f118521..49d1605 100644
--- a/metron-platform/metron-indexing/metron-indexing-common/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-common/pom.xml
@@ -60,11 +60,6 @@
<scope>runtime</scope>
</dependency>
<dependency>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-math</artifactId>
- <version>2.2</version>
- </dependency>
- <dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${global_hbase_version}</version>
@@ -196,6 +191,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-math</artifactId>
+ <version>2.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
index 384cdaa..09147b0 100644
--- a/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
+++ b/metron-platform/metron-pcap-backend/src/main/java/org/apache/metron/spout/pcap/PartitionHDFSWriter.java
@@ -212,7 +212,7 @@ public class PartitionHDFSWriter implements AutoCloseable, Serializable {
close();
if(fs instanceof LocalFileSystem) {
- outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())));
+ outputStream = new FSDataOutputStream(new FileOutputStream(new File(path.toString())), new FileSystem.Statistics(fs.getScheme()));
syncHandler = SyncHandlers.LOCAL.getHandler();
}
else {
diff --git a/metron-stellar/stellar-common/pom.xml b/metron-stellar/stellar-common/pom.xml
index 584b27c..1380e5a 100644
--- a/metron-stellar/stellar-common/pom.xml
+++ b/metron-stellar/stellar-common/pom.xml
@@ -148,7 +148,7 @@
<groupId>commons-beanutils</groupId>
<artifactId>commons-beanutils</artifactId>
<optional>true</optional>
- <version>1.8.3</version>
+ <version>1.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
diff --git a/pom.xml b/pom.xml
index ec5dd4f..90de889 100644
--- a/pom.xml
+++ b/pom.xml
@@ -145,6 +145,7 @@
<global_storm_kafka_version>1.2.3</global_storm_kafka_version>
<global_zeppelin_version>0.8.0</global_zeppelin_version>
<global_solr_version>7.4.0</global_solr_version>
+ <base_hadoop_version>3.1.1</base_hadoop_version>
<global_hbase_version>2.0.2</global_hbase_version>
<global_hbase_guava_version>17.0</global_hbase_guava_version>
<!-- For Zookeeper/Curator version compatibility see