You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/09 00:37:21 UTC

[incubator-pinot] 01/01: Adding minion client

This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch adding-minion-clients
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 35e781224a08fd7b32cee67f5c7d24a186998e76
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Dec 8 16:22:05 2020 -0800

    Adding minion client
---
 .../apache/pinot/common/minion/MinionClient.java   | 115 +++++++++++++++++++++
 .../common/minion/MinionRequestURLBuilder.java     |  83 +++++++++++++++
 .../pinot/common/minion/MinionClientTest.java      |  91 ++++++++++++++++
 3 files changed, 289 insertions(+)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
new file mode 100644
index 0000000..88bcc39
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MinionClient is the client-side APIs for Pinot Controller tasks APIs.
+ */
+public class MinionClient {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class);
+  private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
+  private static final String ACCEPT = "accept";
+  private static final String APPLICATION_JSON = "application/json";
+  private static final String HTTP = "http";
+
+  private final String _controllerUrl;
+
+  public MinionClient(String controllerHost, String controllerPort) {
+    this(HTTP, controllerHost, controllerPort);
+  }
+
+  public MinionClient(String scheme, String controllerHost, String controllerPort) {
+    this(String.format("%s://%s:%s", scheme, controllerHost, controllerPort));
+  }
+
+  public MinionClient(String controllerUrl) {
+    _controllerUrl = controllerUrl;
+  }
+
+  public String getControllerUrl() {
+    return _controllerUrl;
+  }
+
+  public Map<String, String> scheduleMinionTasks()
+      throws IOException {
+    HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule());
+    HttpResponse response = HTTP_CLIENT.execute(httpPost);
+    int statusCode = response.getStatusLine().getStatusCode();
+    final String responseString = IOUtils.toString(response.getEntity().getContent());
+    if (statusCode >= 400) {
+      throw new HttpException(String
+          .format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString));
+    }
+    return JsonUtils.stringToObject(responseString, Map.class);
+  }
+
+  public Map<String, String> getTasksStates(String taskType)
+      throws IOException {
+    HttpGet httpGet =
+        createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTasksStates(taskType));
+    HttpResponse response = HTTP_CLIENT.execute(httpGet);
+    int statusCode = response.getStatusLine().getStatusCode();
+    final String responseString = IOUtils.toString(response.getEntity().getContent());
+    if (statusCode >= 400) {
+      throw new HttpException(String
+          .format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString));
+    }
+    return JsonUtils.stringToObject(responseString, Map.class);
+  }
+
+  public String getTaskState(String taskName)
+      throws IOException {
+    HttpGet httpGet = createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskState(taskName));
+    HttpResponse response = HTTP_CLIENT.execute(httpGet);
+    int statusCode = response.getStatusLine().getStatusCode();
+    String responseString = IOUtils.toString(response.getEntity().getContent());
+    if (statusCode >= 400) {
+      throw new HttpException(
+          String.format("Unable to get task state. Error code %d, Error message: %s", statusCode, responseString));
+    }
+    return responseString;
+  }
+
+  private HttpGet createHttpGetRequest(String uri) {
+    HttpGet httpGet = new HttpGet(uri);
+    httpGet.setHeader(ACCEPT, APPLICATION_JSON);
+    return httpGet;
+  }
+
+  private HttpPost createHttpPostRequest(String uri) {
+    HttpPost httpPost = new HttpPost(uri);
+    httpPost.setHeader(ACCEPT, APPLICATION_JSON);
+    return httpPost;
+  }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
new file mode 100644
index 0000000..4432bc9
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.StringUtil;
+
+
+/**
+ * MinionRequestURLBuilder is the helper class to generate URLs for task APIs.
+ */
+public class MinionRequestURLBuilder {
+
+  private final String _baseUrl;
+
+  private MinionRequestURLBuilder(String baseUrl) {
+    _baseUrl = StringUtils.removeEnd(baseUrl, "/");
+  }
+
+  public static MinionRequestURLBuilder baseUrl(String baseUrl) {
+    return new MinionRequestURLBuilder(baseUrl);
+  }
+
+  public String forTaskSchedule() {
+    return StringUtil.join("/", _baseUrl, "tasks/schedule");
+  }
+
+  public String forListAllTasks(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "tasks");
+  }
+
+  public String forListAllTaskTypes() {
+    return StringUtil.join("/", _baseUrl, "tasks/tasktypes");
+  }
+
+  public String forTaskTypeState(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "state");
+  }
+
+  public String forTasksStates(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "taskstates");
+  }
+
+  public String forTaskState(String taskName) {
+    return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "state");
+  }
+
+  public String forTaskConfig(String taskName) {
+    return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "config");
+  }
+
+  public String forTaskTypeCleanup(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "clenaup");
+  }
+
+  public String forTaskTypeStop(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
+  }
+
+  public String forTaskTypeResume(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume");
+  }
+
+  public String forTaskTypeDelete(String taskType) {
+    return StringUtil.join("/", _baseUrl, "tasks", taskType);
+  }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
new file mode 100644
index 0000000..869dde1
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MinionClientTest {
+  private static final Logger LOGGER = LoggerFactory.getLogger(MinionClientTest.class);
+
+  private HttpHandler createHandler(int status, String msg, int sleepTimeMs) {
+    return httpExchange -> {
+      if (sleepTimeMs > 0) {
+        try {
+          Thread.sleep(sleepTimeMs);
+        } catch (InterruptedException e) {
+          LOGGER.warn("Handler interrupted during sleep");
+        }
+      }
+      httpExchange.sendResponseHeaders(status, msg.length());
+      OutputStream responseBody = httpExchange.getResponseBody();
+      responseBody.write(msg.getBytes());
+      responseBody.close();
+    };
+  }
+
+  private HttpServer startServer(int port, String path, HttpHandler handler)
+      throws IOException {
+    HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+    server.createContext(path, handler);
+    new Thread(() -> server.start()).start();
+    return server;
+  }
+
+  @Test
+  public void testTaskSchedule()
+      throws IOException {
+    HttpServer httpServer = startServer(14202, "/tasks/schedule",
+        createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}",
+            0));
+    MinionClient minionClient = new MinionClient("localhost", "14202");
+    Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"),
+        "Task_SegmentGenerationAndPushTask_1607470525615");
+    httpServer.stop(0);
+  }
+
+  @Test
+  public void testTasksStates()
+      throws IOException {
+    HttpServer httpServer = startServer(14203, "/tasks/SegmentGenerationAndPushTask/taskstates",
+        createHandler(200, "{\"Task_SegmentGenerationAndPushTask_1607470525615\":\"IN_PROGRESS\"}", 0));
+    MinionClient minionClient = new MinionClient("http", "localhost", "14203");
+    Assert.assertEquals(minionClient.getTasksStates("SegmentGenerationAndPushTask")
+        .get("Task_SegmentGenerationAndPushTask_1607470525615"), "IN_PROGRESS");
+    httpServer.stop(0);
+  }
+
+  @Test
+  public void testTaskState()
+      throws IOException {
+    HttpServer httpServer = startServer(14204, "/tasks/task/Task_SegmentGenerationAndPushTask_1607470525615/state",
+        createHandler(200, "\"COMPLETED\"", 0));
+    MinionClient minionClient = new MinionClient("http://localhost:14204");
+    Assert.assertEquals(minionClient.getTaskState("Task_SegmentGenerationAndPushTask_1607470525615"), "\"COMPLETED\"");
+    httpServer.stop(0);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org