You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rs...@apache.org on 2018/03/14 10:12:19 UTC

[kafka] branch trunk updated: MINOR: Improve Trogdor client logging. (#4675)

This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf8a4c2  MINOR: Improve Trogdor client logging. (#4675)
bf8a4c2 is described below

commit bf8a4c2ce7291e2f92fca297e970f668b4b85066
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Wed Mar 14 03:12:15 2018 -0700

    MINOR: Improve Trogdor client logging. (#4675)
    
    AgentClient and CoordinatorClient should have the option of logging failures to custom log4j objects.  There should also be builders for these objects, to make them easier to extend in the future.
    
    Reviewers: Rajini Sivaram <ra...@googlemail.com>
---
 .../apache/kafka/trogdor/agent/AgentClient.java    | 50 +++++++++++++++++--
 .../trogdor/coordinator/CoordinatorClient.java     | 56 ++++++++++++++++++----
 .../kafka/trogdor/coordinator/NodeManager.java     |  5 +-
 .../apache/kafka/trogdor/rest/JsonRestServer.java  | 55 +++++++++++++++++++--
 .../org/apache/kafka/trogdor/agent/AgentTest.java  | 18 ++++---
 .../kafka/trogdor/common/MiniTrogdorCluster.java   | 10 +++-
 6 files changed, 167 insertions(+), 27 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
index 9c06591..08769a0 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/agent/AgentClient.java
@@ -33,6 +33,8 @@ import org.apache.kafka.trogdor.rest.JsonRestServer;
 import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopWorkerRequest;
 import org.apache.kafka.trogdor.rest.StopWorkerResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -41,6 +43,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  * A client for the Trogdor agent.
  */
 public class AgentClient {
+    private final Logger log;
+
     /**
      * The maximum number of tries to make.
      */
@@ -51,13 +55,46 @@ public class AgentClient {
      */
     private final String target;
 
-    public AgentClient(int maxTries, String host, int port) {
-        this(maxTries, String.format("%s:%d", host, port));
+    public static class Builder {
+        private Logger log = LoggerFactory.getLogger(AgentClient.class);
+        private int maxTries = 1;
+        private String target = null;
+
+        public Builder() {
+        }
+
+        public Builder log(Logger log) {
+            this.log = log;
+            return this;
+        }
+
+        public Builder maxTries(int maxTries) {
+            this.maxTries = maxTries;
+            return this;
+        }
+
+        public Builder target(String target) {
+            this.target = target;
+            return this;
+        }
+
+        public Builder target(String host, int port) {
+            this.target = String.format("%s:%d", host, port);
+            return this;
+        }
+
+        public AgentClient build() {
+            if (target == null) {
+                throw new RuntimeException("You must specify a target.");
+            }
+            return new AgentClient(log, maxTries, target);
+        }
     }
 
-    public AgentClient(int maxTries, String target) {
-        this.target = target;
+    private AgentClient(Logger log, int maxTries, String target) {
+        this.log = log;
         this.maxTries = maxTries;
+        this.target = target;
     }
 
     public String target() {
@@ -152,7 +189,10 @@ public class AgentClient {
             }
         }
         String target = res.getString("target");
-        AgentClient client = new AgentClient(3, target);
+        AgentClient client = new Builder().
+            maxTries(3).
+            target(target).
+            build();
         if (res.getBoolean("status")) {
             System.out.println("Got agent status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
index 821a76b..870b64e 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/CoordinatorClient.java
@@ -34,6 +34,8 @@ import org.apache.kafka.trogdor.rest.JsonRestServer.HttpResponse;
 import org.apache.kafka.trogdor.rest.StopTaskRequest;
 import org.apache.kafka.trogdor.rest.StopTaskResponse;
 import org.apache.kafka.trogdor.rest.TasksResponse;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import static net.sourceforge.argparse4j.impl.Arguments.store;
 import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
@@ -42,6 +44,8 @@ import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
  * A client for the Trogdor coordinator.
  */
 public class CoordinatorClient {
+    private final Logger log;
+
     /**
      * The maximum number of tries to make.
      */
@@ -52,11 +56,44 @@ public class CoordinatorClient {
      */
     private final String target;
 
-    public CoordinatorClient(int maxTries, String host, int port) {
-        this(maxTries, String.format("%s:%d", host, port));
+    public static class Builder {
+        private Logger log = LoggerFactory.getLogger(CoordinatorClient.class);
+        private int maxTries = 1;
+        private String target = null;
+
+        public Builder() {
+        }
+
+        public Builder log(Logger log) {
+            this.log = log;
+            return this;
+        }
+
+        public Builder maxTries(int maxTries) {
+            this.maxTries = maxTries;
+            return this;
+        }
+
+        public Builder target(String target) {
+            this.target = target;
+            return this;
+        }
+
+        public Builder target(String host, int port) {
+            this.target = String.format("%s:%d", host, port);
+            return this;
+        }
+
+        public CoordinatorClient build() {
+            if (target == null) {
+                throw new RuntimeException("You must specify a target.");
+            }
+            return new CoordinatorClient(log, maxTries, target);
+        }
     }
 
-    public CoordinatorClient(int maxTries, String target) {
+    private CoordinatorClient(Logger log, int maxTries, String target) {
+        this.log = log;
         this.maxTries = maxTries;
         this.target = target;
     }
@@ -78,28 +115,28 @@ public class CoordinatorClient {
 
     public CreateTaskResponse createTask(CreateTaskRequest request) throws Exception {
         HttpResponse<CreateTaskResponse> resp =
-            JsonRestServer.<CreateTaskResponse>httpRequest(url("/coordinator/task/create"), "POST",
+            JsonRestServer.<CreateTaskResponse>httpRequest(log, url("/coordinator/task/create"), "POST",
                 request, new TypeReference<CreateTaskResponse>() { }, maxTries);
         return resp.body();
     }
 
     public StopTaskResponse stopTask(StopTaskRequest request) throws Exception {
         HttpResponse<StopTaskResponse> resp =
-            JsonRestServer.<StopTaskResponse>httpRequest(url("/coordinator/task/stop"), "PUT",
+            JsonRestServer.<StopTaskResponse>httpRequest(log, url("/coordinator/task/stop"), "PUT",
                 request, new TypeReference<StopTaskResponse>() { }, maxTries);
         return resp.body();
     }
 
     public TasksResponse tasks() throws Exception {
         HttpResponse<TasksResponse> resp =
-            JsonRestServer.<TasksResponse>httpRequest(url("/coordinator/tasks"), "GET",
+            JsonRestServer.<TasksResponse>httpRequest(log, url("/coordinator/tasks"), "GET",
                 null, new TypeReference<TasksResponse>() { }, maxTries);
         return resp.body();
     }
 
     public void shutdown() throws Exception {
         HttpResponse<Empty> resp =
-            JsonRestServer.<Empty>httpRequest(url("/coordinator/shutdown"), "PUT",
+            JsonRestServer.<Empty>httpRequest(log, url("/coordinator/shutdown"), "PUT",
                 null, new TypeReference<Empty>() { }, maxTries);
         resp.body();
     }
@@ -158,7 +195,10 @@ public class CoordinatorClient {
             }
         }
         String target = res.getString("target");
-        CoordinatorClient client = new CoordinatorClient(3, target);
+        CoordinatorClient client = new Builder().
+            maxTries(3).
+            target(target).
+            build();
         if (res.getBoolean("status")) {
             System.out.println("Got coordinator status: " +
                 JsonUtil.toPrettyJsonString(client.status()));
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
index fd871bc..0129007 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/coordinator/NodeManager.java
@@ -147,7 +147,10 @@ public final class NodeManager {
     NodeManager(Node node, TaskManager taskManager) {
         this.node = node;
         this.taskManager = taskManager;
-        this.client = new AgentClient(1, node.hostname(), Node.Util.getTrogdorAgentPort(node));
+        this.client = new AgentClient.Builder().
+            maxTries(1).
+            target(node.hostname(), Node.Util.getTrogdorAgentPort(node)).
+            build();
         this.workers = new HashMap<>();
         this.executor = Executors.newSingleThreadScheduledExecutor(
             ThreadUtils.createThreadFactory("NodeManager(" + node.name() + ")",
diff --git a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
index e61b7fe..ee8643b 100644
--- a/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
+++ b/tools/src/main/java/org/apache/kafka/trogdor/rest/JsonRestServer.java
@@ -162,6 +162,8 @@ public class JsonRestServer {
     }
 
     /**
+     * Make an HTTP request.
+     *
      * @param url               HTTP connection will be established with this url.
      * @param method            HTTP method ("GET", "POST", "PUT", etc.)
      * @param requestBodyData   Object to serialize as JSON and send in the request body.
@@ -170,12 +172,28 @@ public class JsonRestServer {
      * @return The deserialized response to the HTTP request, or null if no data is expected.
      */
     public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
-                                    TypeReference<T> responseFormat) throws IOException {
+                                                  TypeReference<T> responseFormat) throws IOException {
+        return httpRequest(log, url, method, requestBodyData, responseFormat);
+    }
+
+    /**
+     * Make an HTTP request.
+     *
+     * @param logger            The logger to use.
+     * @param url               HTTP connection will be established with this url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the request body.
+     * @param responseFormat    Expected format of the response to the HTTP request.
+     * @param <T>               The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(Logger logger, String url, String method,
+            Object requestBodyData, TypeReference<T> responseFormat) throws IOException {
         HttpURLConnection connection = null;
         try {
             String serializedBody = requestBodyData == null ? null :
                 JsonUtil.JSON_SERDE.writeValueAsString(requestBodyData);
-            log.debug("Sending {} with input {} to {}", method, serializedBody, url);
+            logger.debug("Sending {} with input {} to {}", method, serializedBody, url);
             connection = (HttpURLConnection) new URL(url).openConnection();
             connection.setRequestMethod(method);
             connection.setRequestProperty("User-Agent", "kafka");
@@ -225,7 +243,34 @@ public class JsonRestServer {
         }
     }
 
-    public static <T> HttpResponse<T> httpRequest(String url, String method,
+    /**
+     * Make an HTTP request with retries.
+     *
+     * @param url               HTTP connection will be established with this url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the request body.
+     * @param responseFormat    Expected format of the response to the HTTP request.
+     * @param <T>               The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(String url, String method, Object requestBodyData,
+                                                  TypeReference<T> responseFormat, int maxTries)
+            throws IOException, InterruptedException {
+        return httpRequest(log, url, method, requestBodyData, responseFormat, maxTries);
+    }
+
+    /**
+     * Make an HTTP request with retries.
+     *
+     * @param logger            The logger to use.
+     * @param url               HTTP connection will be established with this url.
+     * @param method            HTTP method ("GET", "POST", "PUT", etc.)
+     * @param requestBodyData   Object to serialize as JSON and send in the request body.
+     * @param responseFormat    Expected format of the response to the HTTP request.
+     * @param <T>               The type of the deserialized response to the HTTP request.
+     * @return The deserialized response to the HTTP request, or null if no data is expected.
+     */
+    public static <T> HttpResponse<T> httpRequest(Logger logger, String url, String method,
             Object requestBodyData, TypeReference<T> responseFormat, int maxTries)
             throws IOException, InterruptedException {
         IOException exc = null;
@@ -234,9 +279,9 @@ public class JsonRestServer {
                 Thread.sleep(tries > 1 ? 10 : 2);
             }
             try {
-                return httpRequest(url, method, requestBodyData, responseFormat);
+                return httpRequest(logger, url, method, requestBodyData, responseFormat);
             } catch (IOException e) {
-                log.info("{} {}: error: {}", method, url, e.getMessage());
+                logger.info("{} {}: error: {}", method, url, e.getMessage());
                 exc = e;
             }
         }
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
index b5fa001..30d13b5 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/agent/AgentTest.java
@@ -90,7 +90,8 @@ public class AgentTest {
     @Test
     public void testAgentProgrammaticShutdown() throws Exception {
         Agent agent = createAgent(Scheduler.SYSTEM);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         client.invokeShutdown();
         agent.waitForShutdown();
     }
@@ -98,7 +99,8 @@ public class AgentTest {
     @Test
     public void testAgentGetStatus() throws Exception {
         Agent agent = createAgent(Scheduler.SYSTEM);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         AgentStatusResponse status = client.status();
         assertEquals(agent.status(), status);
         agent.beginShutdown();
@@ -110,7 +112,8 @@ public class AgentTest {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
         Agent agent = createAgent(scheduler);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         AgentStatusResponse status = client.status();
         assertEquals(Collections.emptyMap(), status.workers());
         new ExpectedTasks().waitFor(client);
@@ -158,7 +161,8 @@ public class AgentTest {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
         Agent agent = createAgent(scheduler);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
         final NoOpTaskSpec fooSpec = new NoOpTaskSpec(10, 2);
@@ -213,7 +217,8 @@ public class AgentTest {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
         Agent agent = createAgent(scheduler);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
         SampleTaskSpec fooSpec = new SampleTaskSpec(0, 900000, 1, "");
@@ -273,7 +278,8 @@ public class AgentTest {
         MockTime time = new MockTime(0, 0, 0);
         MockScheduler scheduler = new MockScheduler(time);
         Agent agent = createAgent(scheduler);
-        AgentClient client = new AgentClient(10, "localhost", agent.port());
+        AgentClient client = new AgentClient.Builder().
+            maxTries(10).target("localhost", agent.port()).build();
         new ExpectedTasks().waitFor(client);
 
         try (MockKibosh mockKibosh = new MockKibosh()) {
diff --git a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
index b180c02..ea55f2a 100644
--- a/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
+++ b/tools/src/test/java/org/apache/kafka/trogdor/common/MiniTrogdorCluster.java
@@ -235,7 +235,10 @@ public class MiniTrogdorCluster implements AutoCloseable {
         if (coordinator == null) {
             throw new RuntimeException("No coordinator configured.");
         }
-        return new CoordinatorClient(10, "localhost", coordinator.port());
+        return new CoordinatorClient.Builder().
+            maxTries(10).
+            target("localhost", coordinator.port()).
+            build();
     }
 
     public AgentClient agentClient(String nodeName) {
@@ -243,7 +246,10 @@ public class MiniTrogdorCluster implements AutoCloseable {
         if (agent == null) {
             throw new RuntimeException("No agent configured on node " + nodeName);
         }
-        return new AgentClient(10, "localhost", agent.port());
+        return new AgentClient.Builder().
+            maxTries(10).
+            target("localhost", agent.port()).
+            build();
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
rsivaram@apache.org.