You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/22 16:58:05 UTC

[kafka] branch trunk updated: MINOR: Remove o.a.kafka.common.utils.Base64 and IS_JAVA8_COMPATIBLE

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new a30ecc6  MINOR: Remove o.a.kafka.common.utils.Base64 and IS_JAVA8_COMPATIBLE
a30ecc6 is described below

commit a30ecc67556e46e423a16e725bd712eb72ab338b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue May 22 09:57:11 2018 -0700

    MINOR: Remove o.a.kafka.common.utils.Base64 and IS_JAVA8_COMPATIBLE
    
    We no longer need them since we now require Java 8.
    
    Author: Ismael Juma <is...@juma.me.uk>
    
    Reviewers: Andras Beni <an...@cloudera.com>, Manikumar Reddy O <ma...@gmail.com>, Dong Lin <li...@gmail.com>
    
    Closes #5049 from ijuma/remove-base64
---
 .../scram/internal/ScramCredentialUtils.java       |  14 +-
 .../security/scram/internal/ScramMessages.java     |  19 +-
 .../security/token/delegation/DelegationToken.java |   4 +-
 .../java/org/apache/kafka/common/utils/Base64.java | 320 ---------------------
 .../java/org/apache/kafka/common/utils/Java.java   |   5 -
 .../scram/internal/ScramFormatterTest.java         |  10 +-
 .../security/scram/internal/ScramMessagesTest.java |  14 +-
 .../org/apache/kafka/common/utils/Base64Test.java  |  45 ---
 .../org/apache/kafka/common/utils/JavaTest.java    |   7 -
 .../test/java/org/apache/kafka/test/TestUtils.java |   4 +-
 .../java/org/apache/kafka/connect/data/Values.java |   4 +-
 .../scala/kafka/admin/DelegationTokenCommand.scala |   7 +-
 .../kafka/server/DelegationTokenManager.scala      |  11 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    |   7 +-
 .../main/scala/kafka/utils/PasswordEncoder.scala   |   8 +-
 .../scala/unit/kafka/utils/CoreUtilsTest.scala     |   8 +-
 .../unit/kafka/utils/PasswordEncoderTest.scala     |  11 +-
 17 files changed, 61 insertions(+), 437 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramCredentialUtils.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramCredentialUtils.java
index 91e28a6..880747f 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramCredentialUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramCredentialUtils.java
@@ -16,12 +16,12 @@
  */
 package org.apache.kafka.common.security.scram.internal;
 
+import java.util.Base64;
 import java.util.Collection;
 import java.util.Properties;
 
 import org.apache.kafka.common.security.authenticator.CredentialCache;
 import org.apache.kafka.common.security.scram.ScramCredential;
-import org.apache.kafka.common.utils.Base64;
 
 /**
  * SCRAM Credential persistence utility functions. Implements format conversion used
@@ -43,11 +43,11 @@ public final class ScramCredentialUtils {
     public static String credentialToString(ScramCredential credential) {
         return String.format("%s=%s,%s=%s,%s=%s,%s=%d",
                SALT,
-               Base64.encoder().encodeToString(credential.salt()),
+               Base64.getEncoder().encodeToString(credential.salt()),
                STORED_KEY,
-                Base64.encoder().encodeToString(credential.storedKey()),
+               Base64.getEncoder().encodeToString(credential.storedKey()),
                SERVER_KEY,
-                Base64.encoder().encodeToString(credential.serverKey()),
+               Base64.getEncoder().encodeToString(credential.serverKey()),
                ITERATIONS,
                credential.iterations());
     }
@@ -58,9 +58,9 @@ public final class ScramCredentialUtils {
                 !props.containsKey(SERVER_KEY) || !props.containsKey(ITERATIONS)) {
             throw new IllegalArgumentException("Credentials not valid: " + str);
         }
-        byte[] salt = Base64.decoder().decode(props.getProperty(SALT));
-        byte[] storedKey = Base64.decoder().decode(props.getProperty(STORED_KEY));
-        byte[] serverKey = Base64.decoder().decode(props.getProperty(SERVER_KEY));
+        byte[] salt = Base64.getDecoder().decode(props.getProperty(SALT));
+        byte[] storedKey = Base64.getDecoder().decode(props.getProperty(STORED_KEY));
+        byte[] serverKey = Base64.getDecoder().decode(props.getProperty(SERVER_KEY));
         int iterations = Integer.parseInt(props.getProperty(ITERATIONS));
         return new ScramCredential(salt, storedKey, serverKey, iterations);
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramMessages.java b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramMessages.java
index 439b274..7e88c44 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramMessages.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/scram/internal/ScramMessages.java
@@ -16,9 +16,8 @@
  */
 package org.apache.kafka.common.security.scram.internal;
 
-import org.apache.kafka.common.utils.Base64;
-
 import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -156,7 +155,7 @@ public class ScramMessages {
             }
             this.nonce = matcher.group("nonce");
             String salt = matcher.group("salt");
-            this.salt = Base64.decoder().decode(salt);
+            this.salt = Base64.getDecoder().decode(salt);
         }
         public ServerFirstMessage(String clientNonce, String serverNonce, byte[] salt, int iterations) {
             this.nonce = clientNonce + serverNonce;
@@ -173,7 +172,7 @@ public class ScramMessages {
             return iterations;
         }
         String toMessage() {
-            return String.format("r=%s,s=%s,i=%d", nonce, Base64.encoder().encodeToString(salt), iterations);
+            return String.format("r=%s,s=%s,i=%d", nonce, Base64.getEncoder().encodeToString(salt), iterations);
         }
     }
     /**
@@ -200,9 +199,9 @@ public class ScramMessages {
             if (!matcher.matches())
                 throw new SaslException("Invalid SCRAM client final message format: " + message);
 
-            this.channelBinding = Base64.decoder().decode(matcher.group("channel"));
+            this.channelBinding = Base64.getDecoder().decode(matcher.group("channel"));
             this.nonce = matcher.group("nonce");
-            this.proof = Base64.decoder().decode(matcher.group("proof"));
+            this.proof = Base64.getDecoder().decode(matcher.group("proof"));
         }
         public ClientFinalMessage(byte[] channelBinding, String nonce) {
             this.channelBinding = channelBinding;
@@ -222,13 +221,13 @@ public class ScramMessages {
         }
         public String clientFinalMessageWithoutProof() {
             return String.format("c=%s,r=%s",
-                    Base64.encoder().encodeToString(channelBinding),
+                    Base64.getEncoder().encodeToString(channelBinding),
                     nonce);
         }
         String toMessage() {
             return String.format("%s,p=%s",
                     clientFinalMessageWithoutProof(),
-                    Base64.encoder().encodeToString(proof));
+                    Base64.getEncoder().encodeToString(proof));
         }
     }
     /**
@@ -259,7 +258,7 @@ public class ScramMessages {
                 // ignore
             }
             if (error == null) {
-                this.serverSignature = Base64.decoder().decode(matcher.group("signature"));
+                this.serverSignature = Base64.getDecoder().decode(matcher.group("signature"));
                 this.error = null;
             } else {
                 this.serverSignature = null;
@@ -280,7 +279,7 @@ public class ScramMessages {
             if (error != null)
                 return "e=" + error;
             else
-                return "v=" + Base64.encoder().encodeToString(serverSignature);
+                return "v=" + Base64.getEncoder().encodeToString(serverSignature);
         }
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
index e1f97c1..a1e2372 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/token/delegation/DelegationToken.java
@@ -17,9 +17,9 @@
 package org.apache.kafka.common.security.token.delegation;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.utils.Base64;
 
 import java.util.Arrays;
+import java.util.Base64;
 
 /**
  * A class representing a delegation token.
@@ -44,7 +44,7 @@ public class DelegationToken {
     }
 
     public String hmacAsBase64String() {
-        return Base64.encoder().encodeToString(hmac);
+        return Base64.getEncoder().encodeToString(hmac);
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java b/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
deleted file mode 100644
index 3ab4900..0000000
--- a/clients/src/main/java/org/apache/kafka/common/utils/Base64.java
+++ /dev/null
@@ -1,320 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.common.utils;
-
-import java.lang.invoke.MethodHandle;
-import java.lang.invoke.MethodHandles;
-import java.lang.invoke.MethodType;
-
-/**
- * Temporary class in order to support Java 7 and Java 9. `DatatypeConverter` is not in the base module of Java 9
- * and `java.util.Base64` was only introduced in Java 8.
- */
-public final class Base64 {
-
-    private static final Factory FACTORY;
-
-    static {
-        if (Java.IS_JAVA8_COMPATIBLE)
-            FACTORY = new Java8Factory();
-        else
-            FACTORY = new Java7Factory();
-    }
-
-    private Base64() {}
-
-    public static Encoder encoder() {
-        return FACTORY.encoder();
-    }
-
-    public static Encoder urlEncoderNoPadding() {
-        return FACTORY.urlEncoderNoPadding();
-    }
-
-    public static Decoder decoder() {
-        return FACTORY.decoder();
-    }
-
-    public static Decoder urlDecoder() {
-        return FACTORY.urlDecoder();
-    }
-
-    /* Contains a subset of methods from java.util.Base64.Encoder (introduced in Java 8) */
-    public interface Encoder {
-        String encodeToString(byte[] bytes);
-    }
-
-    /* Contains a subset of methods from java.util.Base64.Decoder (introduced in Java 8) */
-    public interface Decoder {
-        byte[] decode(String string);
-    }
-
-    private interface Factory {
-        Encoder urlEncoderNoPadding();
-        Encoder encoder();
-        Decoder urlDecoder();
-        Decoder decoder();
-    }
-
-    private static class Java8Factory implements Factory {
-
-        // Static final MethodHandles are optimised better by HotSpot
-        private static final MethodHandle URL_ENCODE_NO_PADDING;
-        private static final MethodHandle ENCODE;
-        private static final MethodHandle URL_DECODE;
-        private static final MethodHandle DECODE;
-
-        private static final Encoder URL_ENCODER_NO_PADDING;
-        private static final Encoder ENCODER;
-        private static final Decoder URL_DECODER;
-        private static final Decoder DECODER;
-
-        static {
-            try {
-                Class<?> base64Class = Class.forName("java.util.Base64");
-
-                MethodHandles.Lookup lookup = MethodHandles.publicLookup();
-
-                Class<?> juEncoderClass = Class.forName("java.util.Base64$Encoder");
-
-                MethodHandle getEncoder = lookup.findStatic(base64Class, "getEncoder",
-                        MethodType.methodType(juEncoderClass));
-                Object juEncoder;
-                try {
-                    juEncoder = getEncoder.invoke();
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-                MethodHandle encode = lookup.findVirtual(juEncoderClass, "encodeToString",
-                        MethodType.methodType(String.class, byte[].class));
-                ENCODE = encode.bindTo(juEncoder);
-
-
-                MethodHandle getUrlEncoder = lookup.findStatic(base64Class, "getUrlEncoder",
-                        MethodType.methodType(juEncoderClass));
-                Object juUrlEncoderNoPassing;
-                try {
-                    juUrlEncoderNoPassing = lookup.findVirtual(juEncoderClass, "withoutPadding",
-                            MethodType.methodType(juEncoderClass)).invoke(getUrlEncoder.invoke());
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-                URL_ENCODE_NO_PADDING = encode.bindTo(juUrlEncoderNoPassing);
-
-                Class<?> juDecoderClass = Class.forName("java.util.Base64$Decoder");
-                MethodHandle getDecoder = lookup.findStatic(base64Class, "getDecoder",
-                        MethodType.methodType(juDecoderClass));
-                MethodHandle decode = lookup.findVirtual(juDecoderClass, "decode",
-                        MethodType.methodType(byte[].class, String.class));
-                try {
-                    DECODE = decode.bindTo(getDecoder.invoke());
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-
-                MethodHandle getUrlDecoder = lookup.findStatic(base64Class, "getUrlDecoder",
-                        MethodType.methodType(juDecoderClass));
-                MethodHandle urlDecode = lookup.findVirtual(juDecoderClass, "decode",
-                        MethodType.methodType(byte[].class, String.class));
-                try {
-                    URL_DECODE = urlDecode.bindTo(getUrlDecoder.invoke());
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-                
-                URL_ENCODER_NO_PADDING = new Encoder() {
-                    @Override
-                    public String encodeToString(byte[] bytes) {
-                        try {
-                            return (String) URL_ENCODE_NO_PADDING.invokeExact(bytes);
-                        } catch (Throwable throwable) {
-                            // Invoked method doesn't throw checked exceptions, so safe to cast
-                            throw (RuntimeException) throwable;
-                        }
-                    }
-                };
-
-                ENCODER = new Encoder() {
-                    @Override
-                    public String encodeToString(byte[] bytes) {
-                        try {
-                            return (String) ENCODE.invokeExact(bytes);
-                        } catch (Throwable throwable) {
-                            // Invoked method doesn't throw checked exceptions, so safe to cast
-                            throw (RuntimeException) throwable;
-                        }
-                    }
-                };
-
-                URL_DECODER = new Decoder() {
-                    @Override
-                    public byte[] decode(String string) {
-                        try {
-                            return (byte[]) URL_DECODE.invokeExact(string);
-                        } catch (Throwable throwable) {
-                            // Invoked method doesn't throw checked exceptions, so safe to cast
-                            throw (RuntimeException) throwable;
-                        }
-                    }
-                };
-
-                DECODER = new Decoder() {
-                    @Override
-                    public byte[] decode(String string) {
-                        try {
-                            return (byte[]) DECODE.invokeExact(string);
-                        } catch (Throwable throwable) {
-                            // Invoked method doesn't throw checked exceptions, so safe to cast
-                            throw (RuntimeException) throwable;
-                        }
-                    }
-                };
-
-            } catch (ReflectiveOperationException e) {
-                // Should never happen
-                throw new RuntimeException(e);
-            }
-        }
-
-        @Override
-        public Encoder urlEncoderNoPadding() {
-            return URL_ENCODER_NO_PADDING;
-        }
-
-        @Override
-        public Encoder encoder() {
-            return ENCODER;
-        }
-
-        @Override
-        public Decoder decoder() {
-            return DECODER;
-        }
-
-        @Override
-        public Decoder urlDecoder() {
-            return URL_DECODER;
-        }
-    }
-
-    private static class Java7Factory implements Factory {
-
-        // Static final MethodHandles are optimised better by HotSpot
-        private static final MethodHandle PRINT;
-        private static final MethodHandle PARSE;
-
-        static {
-            try {
-                Class<?> cls = Class.forName("javax.xml.bind.DatatypeConverter");
-                MethodHandles.Lookup lookup = MethodHandles.publicLookup();
-                PRINT = lookup.findStatic(cls, "printBase64Binary", MethodType.methodType(String.class,
-                        byte[].class));
-                PARSE = lookup.findStatic(cls, "parseBase64Binary", MethodType.methodType(byte[].class,
-                        String.class));
-            } catch (ReflectiveOperationException e) {
-                // Should never happen
-                throw new RuntimeException(e);
-            }
-        }
-
-        public static final Encoder URL_ENCODER_NO_PADDING = new Encoder() {
-
-            @Override
-            public String encodeToString(byte[] bytes) {
-                if (bytes.length == 0)
-                    return "";
-                String base64EncodedUUID = Java7Factory.encodeToString(bytes);
-                // Convert to URL safe variant by replacing + and / with - and _ respectively.
-                String urlSafeBase64EncodedUUID = base64EncodedUUID.replace("+", "-").replace("/", "_");
-                // Remove any "=" or "==" padding at the end.
-                // Note that length will be at least 4 here.
-                int index = urlSafeBase64EncodedUUID.indexOf('=', urlSafeBase64EncodedUUID.length() - 2);
-                return index > 0 ? urlSafeBase64EncodedUUID.substring(0, index) : urlSafeBase64EncodedUUID;
-            }
-
-        };
-
-        public static final Encoder ENCODER = new Encoder() {
-            @Override
-            public String encodeToString(byte[] bytes) {
-                return Java7Factory.encodeToString(bytes);
-            }
-        };
-
-        public static final Decoder DECODER = new Decoder() {
-            @Override
-            public byte[] decode(String string) {
-                try {
-                    return (byte[]) PARSE.invokeExact(string);
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-            }
-        };
-
-        public static final Decoder URL_DECODER = new Decoder() {
-            @Override
-            public byte[] decode(String string) {
-                try {
-                    // Convert from URL safe variant by replacing - and _ with + and / respectively,
-                    // and append "=" or "==" padding; then decode.
-                    String unpadded = string.replace("-", "+").replace("_", "/");
-                    int padLength = 4 - (unpadded.length() & 3);
-                    return (byte[]) PARSE.invokeExact(padLength > 2 ? unpadded : unpadded + "==".substring(0, padLength));
-                } catch (Throwable throwable) {
-                    // Invoked method doesn't throw checked exceptions, so safe to cast
-                    throw (RuntimeException) throwable;
-                }
-            }
-        };
-
-        private static String encodeToString(byte[] bytes) {
-            try {
-                return (String) PRINT.invokeExact(bytes);
-            } catch (Throwable throwable) {
-                // Invoked method doesn't throw checked exceptions, so safe to cast
-                throw (RuntimeException) throwable;
-            }
-        }
-
-        @Override
-        public Encoder urlEncoderNoPadding() {
-            return URL_ENCODER_NO_PADDING;
-        }
-
-        @Override
-        public Encoder encoder() {
-            return ENCODER;
-        }
-
-        @Override
-        public Decoder urlDecoder() {
-            return URL_DECODER;
-        }
-
-        @Override
-        public Decoder decoder() {
-            return DECODER;
-        }
-    }
-}
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Java.java b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
index fb8cafa..d3ada57 100644
--- a/clients/src/main/java/org/apache/kafka/common/utils/Java.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Java.java
@@ -38,7 +38,6 @@ public final class Java {
 
     // Having these as static final provides the best opportunity for compilar optimization
     public static final boolean IS_JAVA9_COMPATIBLE = VERSION.isJava9Compatible();
-    public static final boolean IS_JAVA8_COMPATIBLE = VERSION.isJava8Compatible();
 
     public static boolean isIbmJdk() {
         return System.getProperty("java.vendor").contains("IBM");
@@ -65,10 +64,6 @@ public final class Java {
             return majorVersion >= 9;
         }
 
-        // Package private for testing
-        boolean isJava8Compatible() {
-            return majorVersion > 1 || (majorVersion == 1 && minorVersion >= 8);
-        }
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramFormatterTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramFormatterTest.java
index b06b039..b880416 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramFormatterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramFormatterTest.java
@@ -16,13 +16,15 @@
  */
 package org.apache.kafka.common.security.scram.internal;
 
-import org.apache.kafka.common.utils.Base64;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinalMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
 
 import org.junit.Test;
+
+import java.util.Base64;
+
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 
@@ -53,13 +55,13 @@ public class ScramFormatterTest {
         String serverNonce = serverFirst.nonce().substring(clientNonce.length());
         assertEquals("%hvYDpWUa2RaTCAfuxFIlj)hNlF$k0", serverNonce);
         byte[] salt = serverFirst.salt();
-        assertArrayEquals(Base64.decoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
+        assertArrayEquals(Base64.getDecoder().decode("W22ZaJ0SNY7soEsUEjb6gQ=="), salt);
         int iterations = serverFirst.iterations();
         assertEquals(4096, iterations);
         byte[] channelBinding = clientFinal.channelBinding();
-        assertArrayEquals(Base64.decoder().decode("biws"), channelBinding);
+        assertArrayEquals(Base64.getDecoder().decode("biws"), channelBinding);
         byte[] serverSignature = serverFinal.serverSignature();
-        assertArrayEquals(Base64.decoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
+        assertArrayEquals(Base64.getDecoder().decode("6rriTRBi23WpRR/wtup+mMhUZUn/dB5nLTJRsjl95G4="), serverSignature);
 
         byte[] saltedPassword = formatter.saltedPassword(password, salt, iterations);
         byte[] serverKey = formatter.serverKey(saltedPassword);
diff --git a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramMessagesTest.java b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramMessagesTest.java
index d856f37..2c1962c 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramMessagesTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/scram/internal/ScramMessagesTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.security.scram.internal;
 
 import java.nio.charset.StandardCharsets;
+import java.util.Base64;
 import java.util.Collections;
 
 import javax.security.sasl.SaslException;
@@ -26,7 +27,6 @@ import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFinal
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ClientFirstMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFinalMessage;
 import org.apache.kafka.common.security.scram.internal.ScramMessages.ServerFirstMessage;
-import org.apache.kafka.common.utils.Base64;
 
 import org.junit.Before;
 import org.junit.Test;
@@ -294,11 +294,11 @@ public class ScramMessagesTest {
     }
 
     private String randomBytesAsString() {
-        return Base64.encoder().encodeToString(formatter.secureRandomBytes());
+        return Base64.getEncoder().encodeToString(formatter.secureRandomBytes());
     }
 
     private byte[] toBytes(String base64Str) {
-        return Base64.decoder().decode(base64Str);
+        return Base64.getDecoder().decode(base64Str);
     };
 
     private void checkClientFirstMessage(ClientFirstMessage message, String saslName, String nonce, String authzid) {
@@ -309,14 +309,14 @@ public class ScramMessagesTest {
 
     private void checkServerFirstMessage(ServerFirstMessage message, String nonce, String salt, int iterations) {
         assertEquals(nonce, message.nonce());
-        assertArrayEquals(Base64.decoder().decode(salt), message.salt());
+        assertArrayEquals(Base64.getDecoder().decode(salt), message.salt());
         assertEquals(iterations, message.iterations());
     }
 
     private void checkClientFinalMessage(ClientFinalMessage message, String channelBinding, String nonce, String proof) {
-        assertArrayEquals(Base64.decoder().decode(channelBinding), message.channelBinding());
+        assertArrayEquals(Base64.getDecoder().decode(channelBinding), message.channelBinding());
         assertEquals(nonce, message.nonce());
-        assertArrayEquals(Base64.decoder().decode(proof), message.proof());
+        assertArrayEquals(Base64.getDecoder().decode(proof), message.proof());
     }
 
     private void checkServerFinalMessage(ServerFinalMessage message, String error, String serverSignature) {
@@ -324,7 +324,7 @@ public class ScramMessagesTest {
         if (serverSignature == null)
             assertNull("Unexpected server signature", message.serverSignature());
         else
-            assertArrayEquals(Base64.decoder().decode(serverSignature), message.serverSignature());
+            assertArrayEquals(Base64.getDecoder().decode(serverSignature), message.serverSignature());
     }
 
     @SuppressWarnings("unchecked")
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Base64Test.java b/clients/src/test/java/org/apache/kafka/common/utils/Base64Test.java
deleted file mode 100644
index 9dcd15a..0000000
--- a/clients/src/test/java/org/apache/kafka/common/utils/Base64Test.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.utils;
-
-import static org.junit.Assert.assertEquals;
-
-import java.nio.charset.StandardCharsets;
-
-import org.apache.kafka.common.utils.Base64.Decoder;
-import org.apache.kafka.common.utils.Base64.Encoder;
-import org.junit.Test;
-
-public class Base64Test {
-
-    @Test
-    public void testBase64UrlEncodeDecode() {
-        confirmInversesForAllThreePaddingCases(Base64.urlEncoderNoPadding(), Base64.urlDecoder());
-    }
-
-    @Test
-    public void testBase64EncodeDecode() {
-        confirmInversesForAllThreePaddingCases(Base64.encoder(), Base64.decoder());
-    }
-
-    private static void confirmInversesForAllThreePaddingCases(Encoder encoder, Decoder decoder) {
-        for (String text : new String[] {"", "a", "ab", "abc"}) {
-            assertEquals(text, new String(decoder.decode(encoder.encodeToString(text.getBytes(StandardCharsets.UTF_8))),
-                    StandardCharsets.UTF_8));
-        }
-    }
-}
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/JavaTest.java b/clients/src/test/java/org/apache/kafka/common/utils/JavaTest.java
index 4810f92..0ffe06e 100644
--- a/clients/src/test/java/org/apache/kafka/common/utils/JavaTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/JavaTest.java
@@ -60,37 +60,30 @@ public class JavaTest {
         assertEquals(9, v.majorVersion);
         assertEquals(0, v.minorVersion);
         assertTrue(v.isJava9Compatible());
-        assertTrue(v.isJava8Compatible());
 
         v = Java.parseVersion("9.0.1");
         assertEquals(9, v.majorVersion);
         assertEquals(0, v.minorVersion);
         assertTrue(v.isJava9Compatible());
-        assertTrue(v.isJava8Compatible());
 
         v = Java.parseVersion("9.0.0.15"); // Azul Zulu
         assertEquals(9, v.majorVersion);
         assertEquals(0, v.minorVersion);
         assertTrue(v.isJava9Compatible());
-        assertTrue(v.isJava8Compatible());
 
         v = Java.parseVersion("9.1");
         assertEquals(9, v.majorVersion);
         assertEquals(1, v.minorVersion);
         assertTrue(v.isJava9Compatible());
-        assertTrue(v.isJava8Compatible());
 
         v = Java.parseVersion("1.8.0_152");
         assertEquals(1, v.majorVersion);
         assertEquals(8, v.minorVersion);
         assertFalse(v.isJava9Compatible());
-        assertTrue(v.isJava8Compatible());
 
         v = Java.parseVersion("1.7.0_80");
         assertEquals(1, v.majorVersion);
         assertEquals(7, v.minorVersion);
         assertFalse(v.isJava9Compatible());
-        assertFalse(v.isJava8Compatible());
-
     }
 }
diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
index 958ab2c..ef9e541 100644
--- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java
+++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.Base64;
 import org.apache.kafka.common.utils.Utils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -33,6 +32,7 @@ import java.nio.ByteBuffer;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -293,7 +293,7 @@ public class TestUtils {
 
         // Convert into normal variant and add padding at the end.
         String originalClusterId = String.format("%s==", clusterId.replace("_", "/").replace("-", "+"));
-        byte[] decodedUuid = Base64.decoder().decode(originalClusterId);
+        byte[] decodedUuid = Base64.getDecoder().decode(originalClusterId);
 
         // We expect 16 bytes, same as the input UUID.
         assertEquals(decodedUuid.length, 16);
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
index 05248ef..d643aa2 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.connect.data;
 
-import org.apache.kafka.common.utils.Base64;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.Schema.Type;
 import org.apache.kafka.connect.errors.DataException;
@@ -31,6 +30,7 @@ import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.text.StringCharacterIterator;
 import java.util.ArrayList;
+import java.util.Base64;
 import java.util.Calendar;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -644,7 +644,7 @@ public class Values {
                 sb.append(value);
             }
         } else if (value instanceof byte[]) {
-            value = Base64.encoder().encodeToString((byte[]) value);
+            value = Base64.getEncoder().encodeToString((byte[]) value);
             if (embedded) {
                 sb.append('"').append(value).append('"');
             } else {
diff --git a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
index 0e6ea86..616d4dc 100644
--- a/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
+++ b/core/src/main/scala/kafka/admin/DelegationTokenCommand.scala
@@ -19,6 +19,7 @@ package kafka.admin
 
 import java.text.SimpleDateFormat
 import java.util
+import java.util.Base64
 
 import joptsimple.{ArgumentAcceptingOptionSpec, OptionParser}
 import kafka.utils.{CommandLineUtils, Exit, Logging}
@@ -26,7 +27,7 @@ import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.admin.{CreateDelegationTokenOptions, DescribeDelegationTokenOptions, ExpireDelegationTokenOptions, RenewDelegationTokenOptions, AdminClient => JAdminClient}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.token.delegation.DelegationToken
-import org.apache.kafka.common.utils.{Base64, SecurityUtils, Utils}
+import org.apache.kafka.common.utils.{SecurityUtils, Utils}
 
 import scala.collection.JavaConverters._
 import scala.collection.Set
@@ -112,7 +113,7 @@ object DelegationTokenCommand extends Logging {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt).longValue()
     println("Calling renew token operation with hmac :" + hmac +" , renew-time-period :"+ renewTimePeriodMs)
-    val renewResult = adminClient.renewDelegationToken(Base64.decoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
+    val renewResult = adminClient.renewDelegationToken(Base64.getDecoder.decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs))
     val expiryTimeStamp = renewResult.expiryTimestamp().get()
     val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
     println("Completed renew operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
@@ -123,7 +124,7 @@ object DelegationTokenCommand extends Logging {
     val hmac = opts.options.valueOf(opts.hmacOpt)
     val expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt).longValue()
     println("Calling expire token operation with hmac :" + hmac +" , expire-time-period : "+ expiryTimePeriodMs)
-    val expireResult = adminClient.expireDelegationToken(Base64.decoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
+    val expireResult = adminClient.expireDelegationToken(Base64.getDecoder.decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs))
     val expiryTimeStamp = expireResult.expiryTimestamp().get()
     val dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm")
     println("Completed expire operation. New expiry date : %s".format(dateFormat.format(expiryTimeStamp)))
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
index 62a5e20..89254bc 100644
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
@@ -20,9 +20,10 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.nio.charset.StandardCharsets
 import java.security.InvalidKeyException
+import java.util.Base64
+
 import javax.crypto.spec.SecretKeySpec
 import javax.crypto.{Mac, SecretKey}
-
 import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.utils.{CoreUtils, Json, Logging}
@@ -33,7 +34,7 @@ import org.apache.kafka.common.security.scram.internal.{ScramFormatter, ScramMec
 import org.apache.kafka.common.security.scram.ScramCredential
 import org.apache.kafka.common.security.token.delegation.internal.DelegationTokenCache
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, TokenInformation}
-import org.apache.kafka.common.utils.{Base64, Sanitizer, SecurityUtils, Time}
+import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils, Time}
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -78,7 +79,7 @@ object DelegationTokenManager {
    */
   def createBase64HMAC(tokenId: String, secretKey: SecretKey) : String = {
     val hmac = createHmac(tokenId, secretKey)
-    Base64.encoder.encodeToString(hmac)
+    Base64.getEncoder.encodeToString(hmac)
   }
 
   /**
@@ -344,7 +345,7 @@ class DelegationTokenManager(val config: KafkaConfig,
     try {
       val byteArray = new Array[Byte](hmac.remaining)
       hmac.get(byteArray)
-      val base64Pwd = Base64.encoder.encodeToString(byteArray)
+      val base64Pwd = Base64.getEncoder.encodeToString(byteArray)
       val tokenInfo = tokenCache.tokenForHmac(base64Pwd)
       if (tokenInfo == null) None else Some(new DelegationToken(tokenInfo, byteArray))
     } catch {
@@ -514,4 +515,4 @@ case class CreateTokenResult(issueTimestamp: Long,
     val fields = Seq(issueTimestamp, expiryTimestamp, maxTimestamp, tokenId, hmac, error)
     fields.map(_.hashCode()).foldLeft(0)((a, b) => 31 * a + b)
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 938828c..3a4399c 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -22,7 +22,8 @@ import java.nio._
 import java.nio.channels._
 import java.util.concurrent.locks.{Lock, ReadWriteLock}
 import java.lang.management._
-import java.util.{Properties, UUID}
+import java.util.{Base64, Properties, UUID}
+
 import javax.management._
 
 import scala.collection._
@@ -30,7 +31,7 @@ import scala.collection.mutable
 import kafka.cluster.EndPoint
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.utils.{Base64, KafkaThread, Utils}
+import org.apache.kafka.common.utils.{KafkaThread, Utils}
 import org.slf4j.event.Level
 
 /**
@@ -316,7 +317,7 @@ object CoreUtils extends Logging {
 
   def generateUuidAsBase64(): String = {
     val uuid = UUID.randomUUID()
-    Base64.urlEncoderNoPadding.encodeToString(getBytesFromUuid(uuid))
+    Base64.getUrlEncoder.withoutPadding.encodeToString(getBytesFromUuid(uuid))
   }
 
   def getBytesFromUuid(uuid: UUID): Array[Byte] = {
diff --git a/core/src/main/scala/kafka/utils/PasswordEncoder.scala b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
index ff11e24..f748a45 100644
--- a/core/src/main/scala/kafka/utils/PasswordEncoder.scala
+++ b/core/src/main/scala/kafka/utils/PasswordEncoder.scala
@@ -19,13 +19,13 @@ package kafka.utils
 import java.nio.charset.StandardCharsets
 import java.security.{AlgorithmParameters, NoSuchAlgorithmException, SecureRandom}
 import java.security.spec.AlgorithmParameterSpec
+import java.util.Base64
+
 import javax.crypto.{Cipher, SecretKeyFactory}
 import javax.crypto.spec._
-
 import kafka.utils.PasswordEncoder._
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.utils.Base64
 
 import scala.collection.Map
 
@@ -129,9 +129,9 @@ class PasswordEncoder(secret: Password,
     new SecretKeySpec(keyFactory.generateSecret(keySpec).getEncoded, algorithm)
   }
 
-  private def base64Encode(bytes: Array[Byte]): String = Base64.encoder.encodeToString(bytes)
+  private def base64Encode(bytes: Array[Byte]): String = Base64.getEncoder.encodeToString(bytes)
 
-  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.decoder.decode(encoded)
+  private[utils] def base64Decode(encoded: String): Array[Byte] = Base64.getDecoder.decode(encoded)
 
   private def cipherParamsInstance(cipherAlgorithm: String): CipherParamsEncoder = {
     val aesPattern = "AES/(.*)/.*".r
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 25b2fed..7c416a2 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.utils
 
-import java.util.{Arrays, UUID}
+import java.util.{Arrays, Base64, UUID}
 import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.concurrent.locks.ReentrantLock
@@ -29,7 +29,7 @@ import org.junit.Assert._
 import kafka.common.KafkaException
 import kafka.utils.CoreUtils.inLock
 import org.junit.Test
-import org.apache.kafka.common.utils.{Base64, Utils}
+import org.apache.kafka.common.utils.Utils
 import org.slf4j.event.Level
 
 import scala.collection.JavaConverters._
@@ -213,14 +213,14 @@ class CoreUtilsTest extends JUnitSuite with Logging {
   def testUrlSafeBase64EncodeUUID() {
 
     // Test a UUID that has no + or / characters in base64 encoding [a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
-    val clusterId1 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+    val clusterId1 = Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
       "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
     assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
     assertEquals(clusterId1.length, 22)
     assertTrue(clusterIdPattern.matcher(clusterId1).matches())
 
     // Test a UUID that has + or / characters in base64 encoding [d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
-    val clusterId2 = Base64.urlEncoderNoPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
+    val clusterId2 = Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
       "d418ec02-277e-4853-81e6-afe30259daec")))
     assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
     assertEquals(clusterId2.length, 22)
diff --git a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
index 11a2a7a..13d0c0b 100755
--- a/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/PasswordEncoderTest.scala
@@ -23,7 +23,6 @@ import javax.crypto.SecretKeyFactory
 import kafka.server.Defaults
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.config.types.Password
-import org.apache.kafka.common.utils.Java
 import org.junit.Assert._
 import org.junit.Test
 
@@ -109,11 +108,9 @@ class PasswordEncoderTest {
     verifyEncodeDecode(keyFactoryAlg = None, "AES/CFB/PKCS5Padding", keyLength = 128)
     verifyEncodeDecode(keyFactoryAlg = None, "AES/OFB/PKCS5Padding", keyLength = 128)
     verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA1"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
-    if (Java.IS_JAVA8_COMPATIBLE) {
-      verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128)
-      verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
-      verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
-    }
+    verifyEncodeDecode(keyFactoryAlg = None, "AES/GCM/PKCS5Padding", keyLength = 128)
+    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA256"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
+    verifyEncodeDecode(keyFactoryAlg = Some("PBKDF2WithHmacSHA512"), Defaults.PasswordEncoderCipherAlgorithm, keyLength = 128)
   }
 
   private def verifyEncodedPassword(encoder: PasswordEncoder, password: String, encoded: String): Unit = {
@@ -124,4 +121,4 @@ class PasswordEncoderTest {
     assertNotNull("Invalid encoded password", encoder.base64Decode(encodedMap(PasswordEncoder.EncyrptedPasswordProp)))
     assertEquals(password, encoder.decode(encoded).value)
   }
-}
\ No newline at end of file
+}

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.