You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/04 02:44:44 UTC

[pulsar] 06/09: [pulsar-broker]Add alerts for expired/expiring soon tokens (#9321)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 5b7f99fbd1849413e98429ca45f9d4ebf36e41aa
Author: Zike Yang <Ro...@outlook.com>
AuthorDate: Wed Feb 3 01:47:51 2021 +0800

    [pulsar-broker]Add alerts for expired/expiring soon tokens (#9321)
    
    * Add expired token alert.
    
    * Add expiring token alert. Fix expired token metrics.
    
    * Fix testDuplicateMetricTypeDefinitions fail.
    
    (cherry picked from commit ca64811f40b78920e05f6f9ec67f2405bd8a12a0)
---
 .../AuthenticationProviderToken.java               |  30 +++++-
 .../pulsar/broker/stats/PrometheusMetricsTest.java | 110 +++++++++++++++++++++
 2 files changed, 135 insertions(+), 5 deletions(-)

diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
index f2e366f..d847548 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderToken.java
@@ -24,10 +24,14 @@ import java.io.IOException;
 import java.net.SocketAddress;
 import java.security.Key;
 
+import java.util.Date;
 import java.util.List;
 import javax.naming.AuthenticationException;
 import javax.net.ssl.SSLSession;
 
+import io.jsonwebtoken.ExpiredJwtException;
+import io.prometheus.client.Counter;
+import io.prometheus.client.Histogram;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
@@ -69,6 +73,16 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     final static String TOKEN = "token";
 
+    private static final Counter expiredTokenMetrics = Counter.build()
+            .name("pulsar_expired_token_count")
+            .help("Pulsar expired token")
+            .register();
+    private static final Histogram expiringTokenMinutesMetrics = Histogram.build()
+            .name("pulsar_expiring_token_minutes")
+            .help("The remaining time of expiring token in minutes")
+            .buckets(5, 10, 60, 240)
+            .register();
+
     private Key validationKey;
     private String roleClaim;
     private SignatureAlgorithm publicKeyAlg;
@@ -121,18 +135,18 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
 
     @Override
     public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
-        // Get Token
-        String token;
         try {
+            // Get Token
+            String token;
             token = getToken(authData);
+            // Parse Token by validating
+            String role = getPrincipal(authenticateToken(token));
             AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
+            return role;
         } catch (AuthenticationException exception) {
             AuthenticationMetrics.authenticateFailure(getClass().getSimpleName(), getAuthMethodName(), exception.getMessage());
             throw exception;
         }
-
-        // Parse Token by validating
-        return getPrincipal(authenticateToken(token));
     }
 
     @Override
@@ -200,8 +214,14 @@ public class AuthenticationProviderToken implements AuthenticationProvider {
                 }
             }
 
+            if (jwt.getBody().getExpiration() != null) {
+                expiringTokenMinutesMetrics.observe((double) (jwt.getBody().getExpiration().getTime() - new Date().getTime()) / (60 * 1000));
+            }
             return jwt;
         } catch (JwtException e) {
+            if (e instanceof ExpiredJwtException) {
+                expiredTokenMetrics.inc();
+            }
             throw new AuthenticationException("Failed to authentication token: " + e.getMessage());
         }
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
index 58ed618..a266224 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java
@@ -32,6 +32,7 @@ import java.io.InputStreamReader;
 import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.math.RoundingMode;
+import java.util.Date;
 import java.text.NumberFormat;
 import java.util.Arrays;
 import java.util.Collection;
@@ -550,6 +551,11 @@ public class PrometheusMetricsTest extends BrokerTestBase {
                     if (!typeDefs.containsKey(summaryMetricName)) {
                         fail("Metric " + metricName + " does not have a corresponding summary type definition");
                     }
+                } else if (metricName.endsWith("_bucket")) {
+                    String summaryMetricName = metricName.substring(0, metricName.indexOf("_bucket"));
+                    if (!typeDefs.containsKey(summaryMetricName)) {
+                        fail("Metric " + metricName + " does not have a corresponding summary type definition");
+                    }
                 } else {
                     fail("Metric " + metricName + " does not have a type definition");
                 }
@@ -771,6 +777,110 @@ public class PrometheusMetricsTest extends BrokerTestBase {
         assertEquals(metric.tags.get("provider_name"), provider.getClass().getSimpleName());
     }
 
+    @Test
+    public void testExpiredTokenMetrics() throws Exception {
+        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        Date expiredDate = new Date(System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1));
+        String expiredToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
+
+        try {
+            provider.authenticate(new AuthenticationDataSource() {
+                @Override
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                @Override
+                public String getCommandData() {
+                    return expiredToken;
+                }
+            });
+            fail("Should have failed");
+        } catch (AuthenticationException e) {
+            // expected, token was expired
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expired_token_count");
+        assertEquals(cm.size(), 1);
+
+        provider.close();
+    }
+
+    @Test
+    public void testExpiringTokenMetrics() throws Exception {
+        SecretKey secretKey = AuthTokenUtils.createSecretKey(SignatureAlgorithm.HS256);
+
+        AuthenticationProviderToken provider = new AuthenticationProviderToken();
+
+        Properties properties = new Properties();
+        properties.setProperty("tokenSecretKey", AuthTokenUtils.encodeKeyBase64(secretKey));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        provider.initialize(conf);
+
+        int[] tokenRemainTime = new int[]{3, 7, 40, 100, 400};
+
+        for (int remainTime : tokenRemainTime) {
+            Date expiredDate = new Date(System.currentTimeMillis() + TimeUnit.MINUTES.toMillis(remainTime));
+            String expiringToken = AuthTokenUtils.createToken(secretKey, "subject", Optional.of(expiredDate));
+            provider.authenticate(new AuthenticationDataSource() {
+                @Override
+                public boolean hasDataFromCommand() {
+                    return true;
+                }
+
+                @Override
+                public String getCommandData() {
+                    return expiringToken;
+                }
+            });
+        }
+
+        ByteArrayOutputStream statsOut = new ByteArrayOutputStream();
+        PrometheusMetricsGenerator.generate(pulsar, false, false, statsOut);
+        String metricsStr = new String(statsOut.toByteArray());
+        Multimap<String, Metric> metrics = parseMetrics(metricsStr);
+        Metric countMetric = ((List<Metric>) metrics.get("pulsar_expiring_token_minutes_count")).get(0);
+        assertEquals(countMetric.value, tokenRemainTime.length);
+        List<Metric> cm = (List<Metric>) metrics.get("pulsar_expiring_token_minutes_bucket");
+        assertEquals(cm.size(), 5);
+        cm.forEach((e) -> {
+            switch (e.tags.get("le")) {
+                case "5.0":
+                    assertEquals(e.value, 1);
+                    break;
+                case "10.0":
+                    assertEquals(e.value, 2);
+                    break;
+                case "60.0":
+                    assertEquals(e.value, 3);
+                    break;
+                case "240.0":
+                    assertEquals(e.value, 4);
+                    break;
+                default:
+                    assertEquals(e.value, 5);
+                    break;
+            }
+        });
+        provider.close();
+    }
+
     /**
      * Hacky parsing of Prometheus text format. Sould be good enough for unit tests
      */