You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@metron.apache.org by ni...@apache.org on 2019/10/23 19:39:45 UTC

[metron] branch feature/METRON-2088-support-hdp-3.1 updated: METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539

This is an automated email from the ASF dual-hosted git repository.

nickallen pushed a commit to branch feature/METRON-2088-support-hdp-3.1
in repository https://gitbox.apache.org/repos/asf/metron.git


The following commit(s) were added to refs/heads/feature/METRON-2088-support-hdp-3.1 by this push:
     new 0a619e0  METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539
0a619e0 is described below

commit 0a619e082f34d65269f03ed4c6e16ba8bb5a0dbb
Author: nickwallen <ni...@apache.org>
AuthorDate: Wed Oct 23 15:39:19 2019 -0400

    METRON-2297 Enrichment Topology Unable to Load Geo IP Data from HDFS (nickwallen) closes apache/metron#1539
---
 metron-analytics/metron-profiler-storm/pom.xml     |   5 +
 .../CURRENT/configuration/metron-security-env.xml  |  10 +-
 .../CURRENT/package/scripts/params/params_linux.py |   7 +-
 .../storm/security/auth/kerberos/AutoTGT.java      | 254 +++++++++++++++++++++
 .../metron-elasticsearch-storm/pom.xml             |   5 +
 .../metron-enrichment-storm/pom.xml                |   5 +
 .../metron-indexing/metron-indexing-storm/pom.xml  |   5 +
 metron-platform/metron-pcap-backend/pom.xml        |   5 +
 .../metron-solr/metron-solr-storm/pom.xml          |   5 +
 9 files changed, 299 insertions(+), 2 deletions(-)

diff --git a/metron-analytics/metron-profiler-storm/pom.xml b/metron-analytics/metron-profiler-storm/pom.xml
index 84c0631..da049ac 100644
--- a/metron-analytics/metron-profiler-storm/pom.xml
+++ b/metron-analytics/metron-profiler-storm/pom.xml
@@ -148,6 +148,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-writer-storm</artifactId>
             <version>${project.parent.version}</version>
             <exclusions>
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
index beb0451..fc91a4a 100644
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/configuration/metron-security-env.xml
@@ -159,7 +159,6 @@
     </value-attributes>
     <on-ambari-upgrade add="true"/>
   </property>
-
   <property>
     <name>metron.ldap.ssl.truststore</name>
     <display-name>LDAP Truststore</display-name>
@@ -240,4 +239,13 @@
     <description>Name of the role at the authentication provider that provides administrative access to Metron.</description>
     <on-ambari-upgrade add="true"/>
   </property>
+  <property>
+    <name>topology_auto_credentials</name>
+    <display-name>Topology Auto Credentials</display-name>
+    <description>The value of Storm's `topology.auto-credentials`. A list of plugins used to unpack credentials on the Storm worker. This value is only used when Kerberos has been enabled.</description>
+    <value>['org.apache.metron.storm.security.auth.kerberos.AutoTGT']</value>
+    <value-attributes>
+      <empty-value-valid>true</empty-value-valid>
+    </value-attributes>
+  </property>
 </configuration>
\ No newline at end of file
diff --git a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
index b7fbcf4..d0026dd 100755
--- a/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
+++ b/metron-deployment/packaging/ambari/metron-mpack/src/main/resources/common-services/METRON/CURRENT/package/scripts/params/params_linux.py
@@ -258,7 +258,12 @@ profiler_topology_worker_childopts = client_jaas_arg if security_enabled else ''
 indexing_topology_worker_childopts = client_jaas_arg if security_enabled else ''
 pcap_topology_worker_childopts = client_jaas_arg if security_enabled else ''
 metron_jvm_flags += (' ' + client_jaas_arg) if security_enabled else ''
-topology_auto_credentials = config['configurations']['storm-site'].get('nimbus.credential.renewers.classes', [])
+
+# the user-defined `topology.auto-credentials` are only used if security is enabled
+topology_auto_credentials = []
+if security_enabled:
+    topology_auto_credentials = config['configurations']['metron-security-env'].get('topology_auto_credentials', [])
+
 # Needed for storm.config, because it needs Java String
 topology_auto_credentials_double_quotes = str(topology_auto_credentials).replace("'", '"')
 
diff --git a/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java
new file mode 100644
index 0000000..adf0327
--- /dev/null
+++ b/metron-platform/metron-common-streaming/metron-common-storm/src/main/java/org/apache/metron/storm/security/auth/kerberos/AutoTGT.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.metron.storm.security.auth.kerberos;
+
+import org.apache.storm.security.auth.AuthUtils;
+import org.apache.storm.security.auth.IAutoCredentials;
+import org.apache.storm.security.auth.ICredentialsRenewer;
+import org.apache.storm.security.auth.kerberos.ClientCallbackHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.DestroyFailedException;
+import javax.security.auth.RefreshFailedException;
+import javax.security.auth.Subject;
+import javax.security.auth.kerberos.KerberosPrincipal;
+import javax.security.auth.kerberos.KerberosTicket;
+import javax.security.auth.login.Configuration;
+import javax.security.auth.login.LoginContext;
+import javax.xml.bind.DatatypeConverter;
+import java.lang.reflect.Method;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Automatically take a user's TGT, and push it, and renew it in Nimbus.
+ *
+ * <p>To allow a topology running in a Storm Supervisor to authenticate with Hadoop using
+ * the TGT pushed by Nimbus, this class should be configured as part of a topology's
+ * `topology.auto-credentials` parameter.
+ *
+ * <p>When using Storm's {@link org.apache.storm.security.auth.kerberos.AutoTGT}, Hadoop
+ * authentication fails because it is unable to dyanamically load Hadoop's
+ * {@link org.apache.hadoop.security.UserGroupInformation} class at runtime. This issue is
+ * avoided by using this custom AutoTGT implementation that is packaged in a topology's uber jar.
+ *
+ * <p>This work is derived from the {@link org.apache.storm.security.auth.kerberos.AutoTGT} class
+ * in storm-core version 1.2.1.
+ */
+public class AutoTGT implements IAutoCredentials, ICredentialsRenewer {
+    private static final Logger LOG = LoggerFactory.getLogger(AutoTGT.class);
+    private static final float TICKET_RENEW_WINDOW = 0.80f;
+    protected static final AtomicReference<KerberosTicket> kerbTicket = new AtomicReference<>();
+    private Map conf;
+
+    public void prepare(Map conf) {
+        this.conf = conf;
+    }
+
+    private static KerberosTicket getTGT(Subject subject) {
+        Set<KerberosTicket> tickets = subject.getPrivateCredentials(KerberosTicket.class);
+        for(KerberosTicket ticket: tickets) {
+            KerberosPrincipal server = ticket.getServer();
+            if (server.getName().equals("krbtgt/" + server.getRealm() + "@" + server.getRealm())) {
+                return ticket;
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public void populateCredentials(Map<String, String> credentials) {
+        //Log the user in and get the TGT
+        try {
+            Configuration login_conf = AuthUtils.GetConfiguration(conf);
+            ClientCallbackHandler client_callback_handler = new ClientCallbackHandler(login_conf);
+
+            //login our user
+            Configuration.setConfiguration(login_conf);
+            LoginContext lc = new LoginContext(AuthUtils.LOGIN_CONTEXT_CLIENT, client_callback_handler);
+            try {
+                lc.login();
+                final Subject subject = lc.getSubject();
+                KerberosTicket tgt = getTGT(subject);
+
+                if (tgt == null) { //error
+                    throw new RuntimeException("Fail to verify user principal with section \""
+                            +AuthUtils.LOGIN_CONTEXT_CLIENT+"\" in login configuration file "+ login_conf);
+                }
+
+                if (!tgt.isForwardable()) {
+                    throw new RuntimeException("The TGT found is not forwardable");
+                }
+
+                if (!tgt.isRenewable()) {
+                    throw new RuntimeException("The TGT found is not renewable");
+                }
+
+                LOG.info("Pushing TGT for "+tgt.getClient()+" to topology.");
+                saveTGT(tgt, credentials);
+            } finally {
+                lc.logout();
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void saveTGT(KerberosTicket tgt, Map<String, String> credentials) {
+        try {
+
+            byte[] bytes = AuthUtils.serializeKerberosTicket(tgt);
+            credentials.put("TGT", DatatypeConverter.printBase64Binary(bytes));
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static KerberosTicket getTGT(Map<String, String> credentials) {
+        KerberosTicket ret = null;
+        if (credentials != null && credentials.containsKey("TGT") && credentials.get("TGT") != null) {
+            ret = AuthUtils.deserializeKerberosTicket(DatatypeConverter.parseBase64Binary(credentials.get("TGT")));
+        }
+        return ret;
+    }
+
+    @Override
+    public void updateSubject(Subject subject, Map<String, String> credentials) {
+        populateSubjectWithTGT(subject, credentials);
+    }
+
+    @Override
+    public void populateSubject(Subject subject, Map<String, String> credentials) {
+        populateSubjectWithTGT(subject, credentials);
+        loginHadoopUser(subject);
+    }
+
+    private void populateSubjectWithTGT(Subject subject, Map<String, String> credentials) {
+        KerberosTicket tgt = getTGT(credentials);
+        if (tgt != null) {
+            clearCredentials(subject, tgt);
+            subject.getPrincipals().add(tgt.getClient());
+            kerbTicket.set(tgt);
+        } else {
+            LOG.info("No TGT found in credentials");
+        }
+    }
+
+    public static void clearCredentials(Subject subject, KerberosTicket tgt) {
+        Set<Object> creds = subject.getPrivateCredentials();
+        synchronized(creds) {
+            Iterator<Object> iterator = creds.iterator();
+            while (iterator.hasNext()) {
+                Object o = iterator.next();
+                if (o instanceof KerberosTicket) {
+                    KerberosTicket t = (KerberosTicket)o;
+                    iterator.remove();
+                    try {
+                        t.destroy();
+                    } catch (DestroyFailedException e) {
+                        LOG.warn("Failed to destory ticket ", e);
+                    }
+                }
+            }
+            if(tgt != null) {
+                creds.add(tgt);
+            }
+        }
+    }
+
+    /**
+     * Hadoop does not just go off of a TGT, it needs a bit more.  This
+     * should fill in the rest.
+     * @param subject the subject that should have a TGT in it.
+     */
+    private void loginHadoopUser(Subject subject) {
+        final String clazz = "org.apache.hadoop.security.UserGroupInformation";
+        Class<?> ugi;
+        try {
+            ugi = Class.forName(clazz);
+        } catch (ClassNotFoundException e) {
+            /*
+             * When using Storm's `org.apache.storm.security.auth.kerberos.AutoTGT` class, Hadoop
+             * authentication fails because it is unable to load Hadoop's UserGroupInformation class
+             * at runtime. This issue is avoided when using a custom AutoTGT implementation like
+             * this, packaged into a topology's uber jar.
+             */
+            LOG.error("Hadoop authentication failed. Hadoop was not found on the class path. " +
+                    "Unable to load '{}' because '{}'", clazz, e.getMessage(), e);
+            return;
+        }
+        try {
+            Method isSecEnabled = ugi.getMethod("isSecurityEnabled");
+            if (!((Boolean)isSecEnabled.invoke(null))) {
+                LOG.warn("Hadoop is on the classpath but not configured for " +
+                        "security, if you want security you need to be sure that " +
+                        "hadoop.security.authentication=kerberos in core-site.xml " +
+                        "in your jar");
+                return;
+            }
+            Method login = ugi.getMethod("loginUserFromSubject", Subject.class);
+            login.invoke(null, subject);
+        } catch (Exception e) {
+            LOG.warn("Something went wrong while trying to initialize Hadoop through reflection. This version of hadoop may not be compatible.", e);
+        }
+    }
+
+    private long getRefreshTime(KerberosTicket tgt) {
+        long start = tgt.getStartTime().getTime();
+        long end = tgt.getEndTime().getTime();
+        return start + (long) ((end - start) * TICKET_RENEW_WINDOW);
+    }
+
+    @Override
+    public void renew(Map<String,String> credentials, Map<String, Object> topologyConf, String topologyOwnerPrincipal) {
+        KerberosTicket tgt = getTGT(credentials);
+        if (tgt != null) {
+            long refreshTime = getRefreshTime(tgt);
+            long now = System.currentTimeMillis();
+            if (now >= refreshTime) {
+                try {
+                    LOG.info("Renewing TGT for "+tgt.getClient());
+                    tgt.refresh();
+                    saveTGT(tgt, credentials);
+                } catch (RefreshFailedException e) {
+                    LOG.warn("Failed to refresh TGT", e);
+                }
+            }
+        }
+    }
+
+    public void renew(Map<String, String> credentials, Map topologyConf) {
+        throw new IllegalStateException("SHOULD NOT BE CALLED");
+    }
+
+    public static void main(String[] args) throws Exception {
+        AutoTGT at = new AutoTGT();
+        Map conf = new java.util.HashMap();
+        conf.put("java.security.auth.login.config", args[0]);
+        at.prepare(conf);
+        Map<String,String> creds = new java.util.HashMap<String,String>();
+        at.populateCredentials(creds);
+        Subject s = new Subject();
+        at.populateSubject(s, creds);
+        LOG.info("Got a Subject "+s);
+    }
+}
\ No newline at end of file
diff --git a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
index 9fccf68..a84e45d 100644
--- a/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
+++ b/metron-platform/metron-elasticsearch/metron-elasticsearch-storm/pom.xml
@@ -100,6 +100,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-indexing-storm</artifactId>
             <version>${project.parent.version}</version>
             <exclusions>
diff --git a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
index 0628609..f441daa 100644
--- a/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
+++ b/metron-platform/metron-enrichment/metron-enrichment-storm/pom.xml
@@ -74,6 +74,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-writer-storm</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
diff --git a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
index ce3fb30..aadd021 100644
--- a/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
+++ b/metron-platform/metron-indexing/metron-indexing-storm/pom.xml
@@ -42,6 +42,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-writer-storm</artifactId>
             <version>${project.parent.version}</version>
         </dependency>
diff --git a/metron-platform/metron-pcap-backend/pom.xml b/metron-platform/metron-pcap-backend/pom.xml
index a9dd765..361ebaf 100644
--- a/metron-platform/metron-pcap-backend/pom.xml
+++ b/metron-platform/metron-pcap-backend/pom.xml
@@ -70,6 +70,11 @@
             <scope>provided</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <version>${global_junit_version}</version>
diff --git a/metron-platform/metron-solr/metron-solr-storm/pom.xml b/metron-platform/metron-solr/metron-solr-storm/pom.xml
index da12b0e..ba46af3 100644
--- a/metron-platform/metron-solr/metron-solr-storm/pom.xml
+++ b/metron-platform/metron-solr/metron-solr-storm/pom.xml
@@ -90,6 +90,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.metron</groupId>
+            <artifactId>metron-common-storm</artifactId>
+            <version>${project.parent.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.metron</groupId>
             <artifactId>metron-indexing-storm</artifactId>
             <version>${project.parent.version}</version>
         </dependency>