You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/23 19:39:45 UTC
[metron] branch feature/METRON-2088-support-hdp-3.1 updated:
METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS
(nickwallen) closes apache/metron#1539
This is an automated email from the ASF dual-hosted git repository.
nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git
The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
new 0a619e0 METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539
0a619e0 is described below
commit 0a619e082f34d65269f03ed4c6e16ba8bb5a0dbb
Author: nickwallen <ni...@apache.org>
AuthorDate: Wed Oct 23 15:39:19 2019 -0400
METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539
---
metron-analytics/metron-profiler-storm/pom.xml | 5 +
.../CURRENT/configuration/metron-security-env.xml | 10 +-
.../CURRENT/package/scripts/params/params_linux.py | 7 +-
.../storm/security/auth/kerberos/AutoTGT.java | 254 +++++++++++++++++++++
.../metron-elasticsearch-storm/pom.xml | 5 +
.../metron-enrichment-storm/pom.xml | 5 +
.../metron-indexing/metron-indexing-storm/pom.xml | 5 +
metron-platform/metron-pcap-backend/pom.xml | 5 +
.../metron-solr/metron-solr-storm/pom.xml | 5 +
9 files changed, 299 insertions(+), 2 deletions(-)
diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index 84c0631..da049ac 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -148,6 +148,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-writer-storm</artifactId>
<version>${project.parent.version}</version>
<exclusions>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
index beb0451..fc91a4a 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
@@ -159,7 +159,6 @@
</value-attributes>
<on-ambari-upgrade add="true"/>
</property>
-
<property>
<name>metron.ldap.ssl.truststore</name>
<display-name>LDAP Truststore</display-name>
@@ -240,4 +239,13 @@
<description>Name of the role at the authentication provider that provides administrative access to Metron.</description>
<on-ambari-upgrade add="true"/>
</property>
+ <property>
+ <name>topology_auto_credentials</name>
+ <display-name>Topology Auto Credentials</display-name>
+ <description>The value of Storm's `topology.auto-credentials`. A list of plugins used to unpack credentials on the Storm worker. This value is only used when Kerberos has been enabled.</description>
+ <value>['org.apache.metron.storm.security.auth.kerberos.AutoTGT']</value>
+ <value-attributes>
+ <empty-value-valid>true</empty-value-valid>
+ </value-attributes>
+ </property>
</configuration>
\ No newline at end of file
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index b7fbcf4..d0026dd 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -258,7 +258,12 @@ profiler_topology_worker_childopts = client_jaas_arg if security_enabled else ''
indexing_topology_worker_childopts = client_jaas_arg if security_enabled else ''
pcap_topology_worker_childopts = client_jaas_arg if security_enabled else ''
metron_jvm_flags += (' ' + client_jaas_arg) if security_enabled else ''
-topology_auto_credentials = config['configurations']['storm-site'].get('nimbus.credential.renewers.classes', [])
+
+# the user-defined `topology.auto-credentials` are only used if security is enabled
+topology_auto_credentials = []
+if security_enabled:
+ topology_auto_credentials = config['configurations']['metron-security-env'].get('topology_auto_credentials', [])
+
# Needed for storm.config, because it needs Java String
topology_auto_credentials_double_quotes = str(topology_auto_credentials).replace("'", '"')
diff --git a/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java
new file mode 100644
index 0000000..adf0327
--- /dev/null
+++ b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.storm.security.auth.kerberos;
+
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.apache.storm.security.auth.kerberos.ClientCallbackHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.xml.bind.DatatypeConverter;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Automatically take a user's TGT, and push it, and renew it in Nimbus.
+ *
+ * <p>To allow a topology running in a Storm Supervisor to authenticate with Hadoop using
+ * the TGT pushed by Nimbus, this class should be configured as part of a topology's
+ * `topology.auto-credentials` parameter.
+ *
+ * <p>When using Storm's {@link org.apache.storm.security.auth.kerberos.AutoTGT}, Hadoop
+ * authentication fails because it is unable to dyanamically load Hadoop's
+ * {@link org.apache.hadoop.security.UserGroupInformation} class at runtime. This issue is
+ * avoided by using this custom AutoTGT implementation that is packaged in a topology's uber jar.
+ *
+ * <p>This work is derived from the {@link org.apache.storm.security.auth.kerberos.AutoTGT} class
+ * in storm-core version 1.2.1.
+ */
+public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
+ private static final Logger LOG = LoggerFactory.getLogger(AutoTGT.class);
+ private static final float TICKET_RENEW_WINDOW = 0.80f;
+ protected static final AtomicReference<KerberosTicket> kerbTicket = new AtomicReference<>();
+ private Map conf;
+
+ public void prepare(Map conf) {
+ this.conf = conf;
+ }
+
+ private static KerberosTicket getTGT(Subject subject) {
+ Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+ for(KerberosTicket ticket: tickets) {
+ KerberosPrincipal server = ticket.getServer();
+ if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+ return ticket;
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public void populateCredentials(Map<String, String> credentials) {
+ //Log the user in and get the TGT
+ try {
+ Configuration login_conf = AuthUtils.GetConfiguration(conf);
+ ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+
+ //login our user
+ Configuration.setConfiguration(login_conf);
+ LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+ try {
+ lc.login();
+ final Subject subject = lc.getSubject();
+ KerberosTicket tgt = getTGT(subject);
+
+ if (tgt == null) { //error
+ throw new RuntimeException("Fail to verify user principal with section \""
+ +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+ }
+
+ if (!tgt.isForwardable()) {
+ throw new RuntimeException("The TGT found is not forwardable");
+ }
+
+ if (!tgt.isRenewable()) {
+ throw new RuntimeException("The TGT found is not renewable");
+ }
+
+ LOG.info("Pushing TGT for "+tgt.getClient()+" to topology.");
+ saveTGT(tgt, credentials);
+ } finally {
+ lc.logout();
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static void saveTGT(KerberosTicket tgt, Map<String, String> credentials) {
+ try {
+
+ byte[] bytes = AuthUtils.serializeKerberosTicket(tgt);
+ credentials.put("TGT", DatatypeConverter.printBase64Binary(bytes));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static KerberosTicket getTGT(Map<String, String> credentials) {
+ KerberosTicket ret = null;
+ if (credentials != null && credentials.containsKey("TGT") && credentials.get("TGT") != null) {
+ ret = AuthUtils.deserializeKerberosTicket(DatatypeConverter.parseBase64Binary(credentials.get("TGT")));
+ }
+ return ret;
+ }
+
+ @Override
+ public void updateSubject(Subject subject, Map<String, String> credentials) {
+ populateSubjectWithTGT(subject, credentials);
+ }
+
+ @Override
+ public void populateSubject(Subject subject, Map<String, String> credentials) {
+ populateSubjectWithTGT(subject, credentials);
+ loginHadoopUser(subject);
+ }
+
+ private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) {
+ KerberosTicket tgt = getTGT(credentials);
+ if (tgt != null) {
+ clearCredentials(subject, tgt);
+ subject.getPrincipals().add(tgt.getClient());
+ kerbTicket.set(tgt);
+ } else {
+ LOG.info("No TGT found in credentials");
+ }
+ }
+
+ public static void clearCredentials(Subject subject, KerberosTicket tgt) {
+ Set<Object> creds = subject.getPrivateCredentials();
+ synchronized(creds) {
+ Iterator<Object> iterator = creds.iterator();
+ while (iterator.hasNext()) {
+ Object o = iterator.next();
+ if (o instanceof KerberosTicket) {
+ KerberosTicket t = (KerberosTicket)o;
+ iterator.remove();
+ try {
+ t.destroy();
+ } catch (DestroyFailedException e) {
+ LOG.warn("Failed to destory ticket ", e);
+ }
+ }
+ }
+ if(tgt != null) {
+ creds.add(tgt);
+ }
+ }
+ }
+
+ /**
+ * Hadoop does not just go off of a TGT, it needs a bit more. This
+ * should fill in the rest.
+ * @param subject the subject that should have a TGT in it.
+ */
+ private void loginHadoopUser(Subject subject) {
+ final String clazz = "org.apache.hadoop.security.UserGroupInformation";
+ Class<?> ugi;
+ try {
+ ugi = Class.forName(clazz);
+ } catch (ClassNotFoundException e) {
+ /*
+ * When using Storm's `org.apache.storm.security.auth.kerberos.AutoTGT` class, Hadoop
+ * authentication fails because it is unable to load Hadoop's UserGroupInformation class
+ * at runtime. This issue is avoided when using a custom AutoTGT implementation like
+ * this, packaged into a topology's uber jar.
+ */
+ LOG.error("Hadoop authentication failed. Hadoop was not found on the class path. " +
+ "Unable to load '{}' because '{}'", clazz, e.getMessage(), e);
+ return;
+ }
+ try {
+ Method isSecEnabled = ugi.getMethod("isSecurityEnabled");
+ if (!((Boolean)isSecEnabled.invoke(null))) {
+ LOG.warn("Hadoop is on the classpath but not configured for " +
+ "security, if you want security you need to be sure that " +
+ "hadoop.security.authentication=kerberos in core-site.xml " +
+ "in your jar");
+ return;
+ }
+ Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+ login.invoke(null, subject);
+ } catch (Exception e) {
+ LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e);
+ }
+ }
+
+ private long getRefreshTime(KerberosTicket tgt) {
+ long start = tgt.getStartTime().getTime();
+ long end = tgt.getEndTime().getTime();
+ return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
+ }
+
+ @Override
+ public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) {
+ KerberosTicket tgt = getTGT(credentials);
+ if (tgt != null) {
+ long refreshTime = getRefreshTime(tgt);
+ long now = System.currentTimeMillis();
+ if (now >= refreshTime) {
+ try {
+ LOG.info("Renewing TGT for "+tgt.getClient());
+ tgt.refresh();
+ saveTGT(tgt, credentials);
+ } catch (RefreshFailedException e) {
+ LOG.warn("Failed to refresh TGT", e);
+ }
+ }
+ }
+ }
+
+ public void renew(Map<String, String> credentials, Map topologyConf) {
+ throw new IllegalStateException("SHOULD NOT BE CALLED");
+ }
+
+ public static void main(String[] args) throws Exception {
+ AutoTGT at = new AutoTGT();
+ Map conf = new java.util.HashMap();
+ conf.put("java.security.auth.login.config", args[0]);
+ at.prepare(conf);
+ Map<String,String> creds = new java.util.HashMap<String,String>();
+ at.populateCredentials(creds);
+ Subject s = new Subject();
+ at.populateSubject(s, creds);
+ LOG.info("Got a Subject "+s);
+ }
+}
\ No newline at end of file
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
index 9fccf68..a84e45d 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
@@ -100,6 +100,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-indexing-storm</artifactId>
<version>${project.parent.version}</version>
<exclusions>
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
index 0628609..f441daa 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
@@ -74,6 +74,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-writer-storm</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
index ce3fb30..aadd021 100644
--- a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
@@ -42,6 +42,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-writer-storm</artifactId>
<version>${project.parent.version}</version>
</dependency>
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index a9dd765..361ebaf 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -70,6 +70,11 @@
<scope>provided</scope>
</dependency>
<dependency>
+ <groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${global_junit_version}</version>
diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml
index da12b0e..ba46af3 100644
--- a/metron-platform/metron-solr/metron-solr-storm/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml
@@ -90,6 +90,11 @@
</dependency>
<dependency>
<groupId>org.apache.metron</groupId>
+ <artifactId>metron-common-storm</artifactId>
+ <version>${project.parent.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.metron</groupId>
<artifactId>metron-indexing-storm</artifactId>
<version>${project.parent.version}</version>
</dependency>