You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "tinaselenge (via GitHub)" <gi...@apache.org> on 2023/01/30 11:22:00 UTC

[GitHub] [kafka] tinaselenge opened a new pull request, #13172: KAFKA-14590: Move DelegationTokenCommand to tools

tinaselenge opened a new pull request, #13172:
URL: https://github.com/apache/kafka/pull/13172

   Support delegationToken APIs in MockAdminClient.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094720518


##########
tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelegationTokenCommandTest {
+
+    @Test
+    public void testDelegationTokenRequests() throws ExecutionException, InterruptedException {
+        Admin adminClient = new MockAdminClient.Builder().build();
+
+        String renewer1 = "User:renewer1";
+        String renewer2 = "User:renewer2";
+
+        // create token1 with renewer1
+        DelegationToken tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer1));
+
+        List<DelegationToken> tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(1, tokens.size());
+        DelegationToken token1 = tokens.get(0);
+        assertEquals(token1, tokenCreated);
+
+        // create token2 with renewer2
+        DelegationToken token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer2));
+
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(2, tokens.size());
+        assertEquals(Arrays.asList(token1, token2), tokens);
+
+        //get tokens for renewer2
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer2));
+        assertEquals(1, tokens.size());
+        assertEquals(Collections.singletonList(token2), tokens);
+
+        //test renewing tokens
+        Long expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()));
+        DelegationToken renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer1)).get(0);
+        assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp());
+
+        //test expire tokens
+        DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()));
+        DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()));
+
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(0, tokens.size());
+
+        //create token with invalid renewer principal type
+        assertThrows(ExecutionException.class, () -> DelegationTokenCommand.createToken(adminClient, getCreateOpts("Group:Renewer3")));
+
+        // try describing tokens for unknown owner
+        assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("User:Unknown")).isEmpty());
+
+    }
+
+    private DelegationTokenCommand.DelegationTokenCommandOptions getCreateOpts(String renewer) {
+        String[] args = {"--bootstrap-server", "localhost:9092", "--max-life-time-period", "-1", "--command-config", "testfile", "--create", "--renewer-principal", renewer};
+        return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
+    }
+
+    private DelegationTokenCommand.DelegationTokenCommandOptions getDescribeOpts(String owner) {
+        String[] args;
+        if (!owner.equals("")) {
+            args = new String[] {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--describe","--owner-principal", owner};
+        } else {
+            args = new String[] {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--describe"};
+        }
+        return new DelegationTokenCommand.DelegationTokenCommandOptions(args);

Review Comment:
   Maybe the following will save us a few repetitive strings?
   ```suggestion
           List<String> args = new ArrayList() {{
             .add("--bootstrap-server");
             .add("localhost:9092");
             .add("--command-config");
             .add("testfile");
             .add("--describe")
           }}
           if (!owner.equals("")) {
             args.add("--owner-principal");
             args.add(owner);
           }
           return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094784273


##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -94,6 +100,8 @@ public class MockAdminClient extends AdminClient {
 
     private Map<MetricName, Metric> mockMetrics = new HashMap<>();
 
+    private final List<DelegationToken> alltokens = new ArrayList<>();

Review Comment:
   ```suggestion
       private final List<DelegationToken> allTokens = new ArrayList<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "tinaselenge (via GitHub)" <gi...@apache.org>.
tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436890093

   > @tinaselenge , thanks for the PR. Some questions:
   > 
   > 1. This PR creates a new `DelegationTokenCommand` class, but there's no old `DelegationTokenCommand` class removal. Why is that?
   > 2. The original `DelegationTokenCommandTest` is an integration test, but now we changed to unit test by mockAdminClient, why do we change that?
   > 
   > Thanks.
   
   Hi @showuon 
   
   1. I have removed the existing Scala class and its test.
   
   2. I thought it's good enough to test it using the mock as it's not really doing anything specific with the cluster. I understand that changes the test behaviour. If you think we should test the tool against an integration test cluster, I'm happy to change it back. Please let me know. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094680573


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");

Review Comment:
   ```suggestion
               this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewer principals.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094788715


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();

Review Comment:
   Since Admin implements AutoCloseable could you use https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094656127


##########
build.gradle:
##########
@@ -1763,6 +1763,7 @@ project(':tools') {
     implementation libs.jacksonJDK8Datatypes
     implementation libs.slf4jApi
     implementation libs.log4j
+    implementation libs.joptSimple

Review Comment:
   If you rebase on trunk there won't be a need for this anymore 😊 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] fvaleri commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "fvaleri (via GitHub)" <gi...@apache.org>.
fvaleri commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094601295


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());

Review Comment:
   I think we should only print the error message, because we know which command we are executing and there is a stack trace which follows.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;

Review Comment:
   This block can be replaced by a one-liner and embedded in DelegationTokenCommandOptions.
   
   ```sh
   int numberOfActions = Stream.of(opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()).filter(b -> b).count();
   ```



##########
build.gradle:
##########
@@ -1763,6 +1763,7 @@ project(':tools') {
     implementation libs.jacksonJDK8Datatypes
     implementation libs.slf4jApi
     implementation libs.log4j
+    implementation libs.joptSimple

Review Comment:
   This change is already merged, so you can remove it and rebase.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);

Review Comment:
   The Admin interface extends AutoClosable, so we can use try-with-resources.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);

Review Comment:
   I would embed all `opts.options...` in DelegationTokenCommandOptions.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;

Review Comment:
   Unless required, I would suggest to always use OptionSpec interface.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();

Review Comment:
   I think we don't need this one.



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);

Review Comment:
   ```suggestion
               System.out.printf("Calling describe token operation for owners: %s%n", ownerPrincipals);
   ```



##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))

Review Comment:
   Why are you using Object instead of String here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1451363118

   Failed tests are unrelated
   ```
       Build / JDK 17 and Scala 2.13 / org.apache.kafka.controller.QuorumControllerTest.testUnregisterBroker()
       Build / JDK 8 and Scala 2.12 / org.apache.kafka.connect.integration.ConnectWorkerIntegrationTest.testBrokerCoordinator
       Build / JDK 8 and Scala 2.12 / kafka.server.KafkaServerKRaftRegistrationTest.[1] Type=ZK, Name=testRegisterZkBrokerInKraft, MetadataVersion=3.4-IV0, Security=PLAINTEXT
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.clients.consumer.internals.EagerConsumerCoordinatorTest.testPrepareJoinAndRejoinAfterFailedRebalance()
       Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
       Build / JDK 11 and Scala 2.13 / kafka.server.DynamicBrokerReconfigurationTest.testTrustStoreAlter(String).quorum=kraft
       Build / JDK 11 and Scala 2.13 / org.apache.kafka.streams.processor.internals.StreamsAssignmentScaleTest.testHighAvailabilityTaskAssignorManyStandbys
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094680573


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");

Review Comment:
   suggestion
   ```
   this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewer principals.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "tinaselenge (via GitHub)" <gi...@apache.org>.
tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1441925588

   @showuon I agree. I will create a Jira issue to create an integration test for this. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "tinaselenge (via GitHub)" <gi...@apache.org>.
tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1433059514

   Thank you @fvaleri  and @clolov for reviewing the PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094686108


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");
+            this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
+            this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
+            this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+                    " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.");
+
+            this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
+                    .withOptionalArg()
+                    .ofType(String.class);
+
+            this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")
+                    .withOptionalArg()
+                    .ofType(String.class);
+
+            this.maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
+                            " then token max life time will default to a server side config the value (delegation.token.max.lifetime.ms).")
+                    .withOptionalArg()
+                    .ofType(Long.class);
+
+            this.renewTimePeriodOpt = parser.accepts("renew-time-period", "Renew time period in milliseconds. If the value is -1, then the" +
+                            " renew time period will default to a server side config the value (delegation.token.expiry.time.ms).")

Review Comment:
   ```suggestion
                               " renew time period will default to the server side config value of (delegation.token.expiry.time.ms).")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094783958


##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -596,22 +604,89 @@ synchronized public DeleteRecordsResult deleteRecords(Map<TopicPartition, Record
 
     @Override
     synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        KafkaFutureImpl<DelegationToken> future = new KafkaFutureImpl<>();
+
+        for (KafkaPrincipal renewer : options.renewers()) {
+            if (!renewer.getPrincipalType().equals(KafkaPrincipal.USER_TYPE)) {
+                future.completeExceptionally(new InvalidPrincipalTypeException(""));
+                return new CreateDelegationTokenResult(future);
+            }
+        }
+
+        String tokenId = Uuid.randomUuid().toString();
+        TokenInformation tokenInfo = new TokenInformation(tokenId,options.renewers().get(0), options.renewers(),System.currentTimeMillis(), options.maxlifeTimeMs(), -1);
+        DelegationToken token = new DelegationToken(tokenInfo,tokenId.getBytes());

Review Comment:
   ```suggestion
           DelegationToken token = new DelegationToken(tokenInfo, tokenId.getBytes());
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094714174


##########
tools/src/test/java/org/apache/kafka/tools/DelegationTokenCommandTest.java:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelegationTokenCommandTest {
+
+    @Test
+    public void testDelegationTokenRequests() throws ExecutionException, InterruptedException {
+        Admin adminClient = new MockAdminClient.Builder().build();
+
+        String renewer1 = "User:renewer1";
+        String renewer2 = "User:renewer2";
+
+        // create token1 with renewer1
+        DelegationToken tokenCreated = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer1));
+
+        List<DelegationToken> tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(1, tokens.size());
+        DelegationToken token1 = tokens.get(0);
+        assertEquals(token1, tokenCreated);
+
+        // create token2 with renewer2
+        DelegationToken token2 = DelegationTokenCommand.createToken(adminClient, getCreateOpts(renewer2));
+
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(2, tokens.size());
+        assertEquals(Arrays.asList(token1, token2), tokens);
+
+        //get tokens for renewer2
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer2));
+        assertEquals(1, tokens.size());
+        assertEquals(Collections.singletonList(token2), tokens);
+
+        //test renewing tokens
+        Long expiryTimestamp = DelegationTokenCommand.renewToken(adminClient, getRenewOpts(token1.hmacAsBase64String()));
+        DelegationToken renewedToken = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(renewer1)).get(0);
+        assertEquals(expiryTimestamp, renewedToken.tokenInfo().expiryTimestamp());
+
+        //test expire tokens
+        DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token1.hmacAsBase64String()));
+        DelegationTokenCommand.expireToken(adminClient, getExpireOpts(token2.hmacAsBase64String()));
+
+        tokens = DelegationTokenCommand.describeToken(adminClient, getDescribeOpts(""));
+        assertEquals(0, tokens.size());
+
+        //create token with invalid renewer principal type
+        assertThrows(ExecutionException.class, () -> DelegationTokenCommand.createToken(adminClient, getCreateOpts("Group:Renewer3")));
+
+        // try describing tokens for unknown owner
+        assertTrue(DelegationTokenCommand.describeToken(adminClient, getDescribeOpts("User:Unknown")).isEmpty());
+
+    }
+
+    private DelegationTokenCommand.DelegationTokenCommandOptions getCreateOpts(String renewer) {
+        String[] args = {"--bootstrap-server", "localhost:9092", "--max-life-time-period", "-1", "--command-config", "testfile", "--create", "--renewer-principal", renewer};
+        return new DelegationTokenCommand.DelegationTokenCommandOptions(args);
+    }
+
+    private DelegationTokenCommand.DelegationTokenCommandOptions getDescribeOpts(String owner) {
+        String[] args;
+        if (!owner.equals("")) {
+            args = new String[] {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--describe","--owner-principal", owner};

Review Comment:
   ```suggestion
               args = new String[] {"--bootstrap-server", "localhost:9092", "--command-config", "testfile", "--describe", "--owner-principal", owner};
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094781664


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");
+            this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
+            this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
+            this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+                    " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.");
+
+            this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
+                    .withOptionalArg()
+                    .ofType(String.class);
+
+            this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")

Review Comment:
   ```suggestion
               this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a Kafka principal. They should be in principalType:name format.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094781236


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");
+            this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
+            this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
+            this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+                    " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.");
+
+            this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")

Review Comment:
   ```suggestion
               this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a Kafka principal. They should be in principalType:name format.")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1441641543

   @tinaselenge ,  for (2), it looks complicated since under the tools module, there's no `BaseRequestTest` or `SaslSetup` to help setup the delegation token integration tests. Since it might make this PR much more complicated, I think we can create another JIRA ticket to track this integration task, so that it won't block the current task. WDYT?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094685504


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");
+            this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
+            this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
+            this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+                    " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.");
+
+            this.ownerPrincipalsOpt = parser.accepts("owner-principal", "owner is a kafka principal. It is should be in principalType:name format.")
+                    .withOptionalArg()
+                    .ofType(String.class);
+
+            this.renewPrincipalsOpt = parser.accepts("renewer-principal", "renewer is a kafka principal. It is should be in principalType:name format.")
+                    .withOptionalArg()
+                    .ofType(String.class);
+
+            this.maxLifeTimeOpt = parser.accepts("max-life-time-period", "Max life period for the token in milliseconds. If the value is -1," +
+                            " then token max life time will default to a server side config the value (delegation.token.max.lifetime.ms).")

Review Comment:
   ```suggestion
                               " then token max life time will default to the server side config value of (delegation.token.max.lifetime.ms).")
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094783703


##########
clients/src/test/java/org/apache/kafka/clients/admin/MockAdminClient.java:
##########
@@ -596,22 +604,89 @@ synchronized public DeleteRecordsResult deleteRecords(Map<TopicPartition, Record
 
     @Override
     synchronized public CreateDelegationTokenResult createDelegationToken(CreateDelegationTokenOptions options) {
-        throw new UnsupportedOperationException("Not implemented yet");
+        KafkaFutureImpl<DelegationToken> future = new KafkaFutureImpl<>();
+
+        for (KafkaPrincipal renewer : options.renewers()) {
+            if (!renewer.getPrincipalType().equals(KafkaPrincipal.USER_TYPE)) {
+                future.completeExceptionally(new InvalidPrincipalTypeException(""));
+                return new CreateDelegationTokenResult(future);
+            }
+        }
+
+        String tokenId = Uuid.randomUuid().toString();
+        TokenInformation tokenInfo = new TokenInformation(tokenId,options.renewers().get(0), options.renewers(),System.currentTimeMillis(), options.maxlifeTimeMs(), -1);

Review Comment:
   ```suggestion
           TokenInformation tokenInfo = new TokenInformation(tokenId, options.renewers().get(0), options.renewers(), System.currentTimeMillis(), options.maxlifeTimeMs(), -1);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094780253


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");
+            this.renewOpt = parser.accepts("renew", "Renew delegation token. Use --renew-time-period option to set renew time period.");
+            this.expiryOpt = parser.accepts("expire", "Expire delegation token. Use --expiry-time-period option to expire the token.");
+            this.describeOpt = parser.accepts("describe", "Describe delegation tokens for the given principals. Use --owner-principal to pass owner/renewer principals." +
+                    " If --owner-principal option is not supplied, all the user owned tokens and tokens where user have Describe permission will be returned.");

Review Comment:
   ```suggestion
                       " If --owner-principal option is not supplied, all the user-owned tokens and tokens where the user has Describe permissions will be returned.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] clolov commented on a diff in pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "clolov (via GitHub)" <gi...@apache.org>.
clolov commented on code in PR #13172:
URL: https://github.com/apache/kafka/pull/13172#discussion_r1094680573


##########
tools/src/main/java/org/apache/kafka/tools/DelegationTokenCommand.java:
##########
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import joptsimple.AbstractOptionSpec;
+import joptsimple.ArgumentAcceptingOptionSpec;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.CreateDelegationTokenOptions;
+import org.apache.kafka.clients.admin.CreateDelegationTokenResult;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenOptions;
+import org.apache.kafka.clients.admin.DescribeDelegationTokenResult;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenOptions;
+import org.apache.kafka.clients.admin.ExpireDelegationTokenResult;
+import org.apache.kafka.clients.admin.RenewDelegationTokenOptions;
+import org.apache.kafka.clients.admin.RenewDelegationTokenResult;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+
+public class DelegationTokenCommand {
+    public static void main(String... args) {
+        Exit.exit(mainNoExit(args));
+    }
+
+    static int mainNoExit(String... args) {
+        try {
+            execute(args);
+            return 0;
+        } catch (TerseException e) {
+            System.err.println(e.getMessage());
+            return 1;
+        } catch (Throwable e) {
+            System.err.println("Error while executing delegation token command : " + e.getMessage());
+            System.err.println(Utils.stackTrace(e));
+            return 1;
+        }
+    }
+
+    static void execute(String... args) throws Exception {
+        Admin adminClient = null;
+        try {
+            DelegationTokenCommandOptions opts = new DelegationTokenCommandOptions(args);
+            CommandLineUtils.maybePrintHelpOrVersion(opts, "This tool helps to create, renew, expire, or describe delegation tokens.");
+
+            // should have exactly one action
+            int numberOfAction = 0;
+            for (Boolean opt : new Boolean[]{opts.hasCreateOpt(), opts.hasRenewOpt(), opts.hasExpireOpt(), opts.hasDescribeOpt()}) {
+                if (opt) {
+                    numberOfAction++;
+                }
+            }
+            if (numberOfAction != 1) {
+                CommandLineUtils.printUsageAndExit(opts.parser, "Command must include exactly one action: --create, --renew, --expire or --describe");
+            }
+
+            opts.checkArgs();
+
+            adminClient = createAdminClient(opts);
+
+            if (opts.hasCreateOpt()) {
+                createToken(adminClient, opts);
+            } else if (opts.hasRenewOpt()) {
+                renewToken(adminClient, opts);
+            } else if (opts.hasExpireOpt()) {
+                expireToken(adminClient, opts);
+            } else if (opts.hasDescribeOpt()) {
+                describeToken(adminClient, opts);
+            }
+
+        } finally {
+            if (adminClient != null)
+                adminClient.close();
+        }
+    }
+
+    public static DelegationToken createToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> renewerPrincipals = getPrincipals(opts, opts.renewPrincipalsOpt);
+        Long maxLifeTimeMs = opts.options.valueOf(opts.maxLifeTimeOpt);
+
+        System.out.println("Calling create token operation with renewers :" + renewerPrincipals + " , max-life-time-period :" + maxLifeTimeMs);
+        CreateDelegationTokenOptions createDelegationTokenOptions = new CreateDelegationTokenOptions().maxlifeTimeMs(maxLifeTimeMs).renewers(renewerPrincipals);
+
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+        if (!ownerPrincipals.isEmpty()) {
+            createDelegationTokenOptions.owner(ownerPrincipals.get(0));
+        }
+
+        CreateDelegationTokenResult createResult = adminClient.createDelegationToken(createDelegationTokenOptions);
+        DelegationToken token = createResult.delegationToken().get();
+        System.out.println("Created delegation token with tokenId : " + token.tokenInfo().tokenId());
+        printToken(Collections.singletonList(token));
+
+        return token;
+    }
+
+    private static void printToken(List<DelegationToken> tokens) {
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n", "TOKENID", "HMAC", "OWNER", "REQUESTER", "RENEWERS", "ISSUEDATE", "EXPIRYDATE", "MAXDATE");
+
+        for (DelegationToken token : tokens) {
+            TokenInformation tokenInfo = token.tokenInfo();
+            System.out.printf("%n%-15s %-30s %-15s %-15s %-25s %-15s %-15s %-15s%n",
+                    tokenInfo.tokenId(),
+                    token.hmacAsBase64String(),
+                    tokenInfo.owner(),
+                    tokenInfo.tokenRequester(),
+                    tokenInfo.renewersAsString(),
+                    dateFormat.format(tokenInfo.issueTimestamp()),
+                    dateFormat.format(tokenInfo.expiryTimestamp()),
+                    dateFormat.format(tokenInfo.maxTimestamp()));
+            System.out.println();
+        }
+    }
+
+    private static List<KafkaPrincipal> getPrincipals(DelegationTokenCommandOptions opts, OptionSpec<String> principalOptionSpec) {
+        List<KafkaPrincipal> principals = new ArrayList<>();
+
+        if (opts.options.has(principalOptionSpec)) {
+            for (Object e : opts.options.valuesOf(principalOptionSpec))
+                principals.add(SecurityUtils.parseKafkaPrincipal(e.toString().trim()));
+        }
+        return principals;
+    }
+
+    public static Long renewToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long renewTimePeriodMs = opts.options.valueOf(opts.renewTimePeriodOpt);
+
+        System.out.println("Calling renew token operation with hmac :" + hmac + " , renew-time-period :" + renewTimePeriodMs);
+        RenewDelegationTokenResult renewResult = adminClient.renewDelegationToken(Base64.getDecoder().decode(hmac), new RenewDelegationTokenOptions().renewTimePeriodMs(renewTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed renew operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+        return expiryTimeStamp;
+    }
+
+    public static void expireToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        String hmac = opts.options.valueOf(opts.hmacOpt);
+        Long expiryTimePeriodMs = opts.options.valueOf(opts.expiryTimePeriodOpt);
+
+        System.out.println("Calling expire token operation with hmac :" + hmac + " , expire-time-period :" + expiryTimePeriodMs);
+        ExpireDelegationTokenResult renewResult = adminClient.expireDelegationToken(Base64.getDecoder().decode(hmac), new ExpireDelegationTokenOptions().expiryTimePeriodMs(expiryTimePeriodMs));
+        Long expiryTimeStamp = renewResult.expiryTimestamp().get();
+        SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm");
+        System.out.printf("Completed expire operation. New expiry date : %s", dateFormat.format(expiryTimeStamp));
+    }
+
+    public static List<DelegationToken> describeToken(Admin adminClient, DelegationTokenCommandOptions opts) throws ExecutionException, InterruptedException {
+        List<KafkaPrincipal> ownerPrincipals = getPrincipals(opts, opts.ownerPrincipalsOpt);
+
+        if (ownerPrincipals.isEmpty()) {
+            System.out.println("Calling describe token operation for current user.");
+        } else {
+            System.out.println("Calling describe token operation for owners :" + ownerPrincipals);
+        }
+
+        DescribeDelegationTokenResult describeResult = adminClient.describeDelegationToken(new DescribeDelegationTokenOptions().owners(ownerPrincipals));
+        List<DelegationToken> tokens = describeResult.delegationTokens().get();
+        System.out.printf("Total number of tokens : %d", tokens.size());
+        printToken(tokens);
+        return tokens;
+    }
+
+    private static Admin createAdminClient(DelegationTokenCommandOptions opts) throws IOException {
+        Properties props = Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt));
+        props.put("bootstrap.servers", opts.options.valueOf(opts.bootstrapServerOpt));
+        return Admin.create(props);
+    }
+
+    static class DelegationTokenCommandOptions extends CommandDefaultOptions {
+        public final ArgumentAcceptingOptionSpec<String> bootstrapServerOpt;
+        public final ArgumentAcceptingOptionSpec<String> commandConfigOpt;
+        public final AbstractOptionSpec<Void> createOpt;
+        public final AbstractOptionSpec<Void> renewOpt;
+        public final AbstractOptionSpec<Void> expiryOpt;
+        public final AbstractOptionSpec<Void> describeOpt;
+        public final ArgumentAcceptingOptionSpec<String> ownerPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<String> renewPrincipalsOpt;
+        public final ArgumentAcceptingOptionSpec<Long> maxLifeTimeOpt;
+        public final ArgumentAcceptingOptionSpec<Long> renewTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<Long> expiryTimePeriodOpt;
+        public final ArgumentAcceptingOptionSpec<String> hmacOpt;
+
+        public DelegationTokenCommandOptions(String[] args) {
+            super(args);
+
+            String bootstrapServerDoc = "REQUIRED: server(s) to use for bootstrapping.";
+            String commandConfigDoc = "REQUIRED: A property file containing configs to be passed to Admin Client. Token management" +
+                    " operations are allowed in secure mode only. This config file is used to pass security related configs.";
+
+            this.bootstrapServerOpt = parser.accepts("bootstrap-server", bootstrapServerDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.commandConfigOpt = parser.accepts("command-config", commandConfigDoc)
+                    .withRequiredArg()
+                    .ofType(String.class);
+
+            this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewers principals.");

Review Comment:
   suggestion
               this.createOpt = parser.accepts("create", "Create a new delegation token. Use --renewer-principal option to pass renewer principals.");
   ```
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon merged pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon merged PR #13172:
URL: https://github.com/apache/kafka/pull/13172


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] tinaselenge commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "tinaselenge (via GitHub)" <gi...@apache.org>.
tinaselenge commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1446056568

   Created a Jira issue for adding integration test for this tool later on: https://issues.apache.org/jira/browse/KAFKA-14763


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] showuon commented on pull request #13172: KAFKA-14590: Move DelegationTokenCommand to tools

Posted by "showuon (via GitHub)" <gi...@apache.org>.
showuon commented on PR #13172:
URL: https://github.com/apache/kafka/pull/13172#issuecomment-1436957769

   > > @tinaselenge , thanks for the PR. Some questions:
   > > 
   > > 1. This PR creates a new `DelegationTokenCommand` class, but there's no old `DelegationTokenCommand` class removal. Why is that?
   > > 2. The original `DelegationTokenCommandTest` is an integration test, but now we changed to unit test by mockAdminClient, why do we change that?
   > > 
   > > Thanks.
   > 
   > Hi @showuon
   > 
   >     1. I have removed the existing Scala class and its test.
   > 
   >     2. I thought it's good enough to test it using the mock as it's not really doing anything specific with the cluster. I understand that changes the test behaviour. If you think we should test the tool against an integration test cluster, I'm happy to change it back. Please let me know.
   
   For (1), thanks for the update
   For (2), yes, I think the integration test is important, especially after KRaft starting to support delegation token, we should rely on integration tests to make sure everything works fine on both ZK and KRaft mode. But the mock client implementation are also great to be added. Those could also be kept. 
   
   Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org