You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xi...@apache.org on 2020/12/09 00:37:21 UTC
[incubator-pinot] 01/01: Adding minion client
This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch adding-minion-clients
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit 35e781224a08fd7b32cee67f5c7d24a186998e76
Author: Xiang Fu <fx...@gmail.com>
AuthorDate: Tue Dec 8 16:22:05 2020 -0800
Adding minion client
---
.../apache/pinot/common/minion/MinionClient.java | 115 +++++++++++++++++++++
.../common/minion/MinionRequestURLBuilder.java | 83 +++++++++++++++
.../pinot/common/minion/MinionClientTest.java | 91 ++++++++++++++++
3 files changed, 289 insertions(+)
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
new file mode 100644
index 0000000..88bcc39
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionClient.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.commons.httpclient.HttpException;
+import org.apache.commons.io.IOUtils;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * MinionClient is the client-side APIs for Pinot Controller tasks APIs.
+ */
+public class MinionClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(MinionClient.class);
+ private static final CloseableHttpClient HTTP_CLIENT = HttpClientBuilder.create().build();
+ private static final String ACCEPT = "accept";
+ private static final String APPLICATION_JSON = "application/json";
+ private static final String HTTP = "http";
+
+ private final String _controllerUrl;
+
+ public MinionClient(String controllerHost, String controllerPort) {
+ this(HTTP, controllerHost, controllerPort);
+ }
+
+ public MinionClient(String scheme, String controllerHost, String controllerPort) {
+ this(String.format("%s://%s:%s", scheme, controllerHost, controllerPort));
+ }
+
+ public MinionClient(String controllerUrl) {
+ _controllerUrl = controllerUrl;
+ }
+
+ public String getControllerUrl() {
+ return _controllerUrl;
+ }
+
+ public Map<String, String> scheduleMinionTasks()
+ throws IOException {
+ HttpPost httpPost = createHttpPostRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskSchedule());
+ HttpResponse response = HTTP_CLIENT.execute(httpPost);
+ int statusCode = response.getStatusLine().getStatusCode();
+ final String responseString = IOUtils.toString(response.getEntity().getContent());
+ if (statusCode >= 400) {
+ throw new HttpException(String
+ .format("Unable to schedule minion tasks. Error code %d, Error message: %s", statusCode, responseString));
+ }
+ return JsonUtils.stringToObject(responseString, Map.class);
+ }
+
+ public Map<String, String> getTasksStates(String taskType)
+ throws IOException {
+ HttpGet httpGet =
+ createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTasksStates(taskType));
+ HttpResponse response = HTTP_CLIENT.execute(httpGet);
+ int statusCode = response.getStatusLine().getStatusCode();
+ final String responseString = IOUtils.toString(response.getEntity().getContent());
+ if (statusCode >= 400) {
+ throw new HttpException(String
+ .format("Unable to get tasks states map. Error code %d, Error message: %s", statusCode, responseString));
+ }
+ return JsonUtils.stringToObject(responseString, Map.class);
+ }
+
+ public String getTaskState(String taskName)
+ throws IOException {
+ HttpGet httpGet = createHttpGetRequest(MinionRequestURLBuilder.baseUrl(getControllerUrl()).forTaskState(taskName));
+ HttpResponse response = HTTP_CLIENT.execute(httpGet);
+ int statusCode = response.getStatusLine().getStatusCode();
+ String responseString = IOUtils.toString(response.getEntity().getContent());
+ if (statusCode >= 400) {
+ throw new HttpException(
+ String.format("Unable to get task state. Error code %d, Error message: %s", statusCode, responseString));
+ }
+ return responseString;
+ }
+
+ private HttpGet createHttpGetRequest(String uri) {
+ HttpGet httpGet = new HttpGet(uri);
+ httpGet.setHeader(ACCEPT, APPLICATION_JSON);
+ return httpGet;
+ }
+
+ private HttpPost createHttpPostRequest(String uri) {
+ HttpPost httpPost = new HttpPost(uri);
+ httpPost.setHeader(ACCEPT, APPLICATION_JSON);
+ return httpPost;
+ }
+}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
new file mode 100644
index 0000000..4432bc9
--- /dev/null
+++ b/pinot-common/src/main/java/org/apache/pinot/common/minion/MinionRequestURLBuilder.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.utils.StringUtil;
+
+
+/**
+ * MinionRequestURLBuilder is the helper class to generate URLs for task APIs.
+ */
+public class MinionRequestURLBuilder {
+
+ private final String _baseUrl;
+
+ private MinionRequestURLBuilder(String baseUrl) {
+ _baseUrl = StringUtils.removeEnd(baseUrl, "/");
+ }
+
+ public static MinionRequestURLBuilder baseUrl(String baseUrl) {
+ return new MinionRequestURLBuilder(baseUrl);
+ }
+
+ public String forTaskSchedule() {
+ return StringUtil.join("/", _baseUrl, "tasks/schedule");
+ }
+
+ public String forListAllTasks(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "tasks");
+ }
+
+ public String forListAllTaskTypes() {
+ return StringUtil.join("/", _baseUrl, "tasks/tasktypes");
+ }
+
+ public String forTaskTypeState(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "state");
+ }
+
+ public String forTasksStates(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "taskstates");
+ }
+
+ public String forTaskState(String taskName) {
+ return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "state");
+ }
+
+ public String forTaskConfig(String taskName) {
+ return StringUtil.join("/", _baseUrl, "tasks/task", taskName, "config");
+ }
+
+ public String forTaskTypeCleanup(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "clenaup");
+ }
+
+ public String forTaskTypeStop(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "stop");
+ }
+
+ public String forTaskTypeResume(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType, "resume");
+ }
+
+ public String forTaskTypeDelete(String taskType) {
+ return StringUtil.join("/", _baseUrl, "tasks", taskType);
+ }
+}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
new file mode 100644
index 0000000..869dde1
--- /dev/null
+++ b/pinot-common/src/test/java/org/apache/pinot/common/minion/MinionClientTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.minion;
+
+import com.sun.net.httpserver.HttpHandler;
+import com.sun.net.httpserver.HttpServer;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class MinionClientTest {
+ private static final Logger LOGGER = LoggerFactory.getLogger(MinionClientTest.class);
+
+ private HttpHandler createHandler(int status, String msg, int sleepTimeMs) {
+ return httpExchange -> {
+ if (sleepTimeMs > 0) {
+ try {
+ Thread.sleep(sleepTimeMs);
+ } catch (InterruptedException e) {
+ LOGGER.warn("Handler interrupted during sleep");
+ }
+ }
+ httpExchange.sendResponseHeaders(status, msg.length());
+ OutputStream responseBody = httpExchange.getResponseBody();
+ responseBody.write(msg.getBytes());
+ responseBody.close();
+ };
+ }
+
+ private HttpServer startServer(int port, String path, HttpHandler handler)
+ throws IOException {
+ HttpServer server = HttpServer.create(new InetSocketAddress(port), 0);
+ server.createContext(path, handler);
+ new Thread(() -> server.start()).start();
+ return server;
+ }
+
+ @Test
+ public void testTaskSchedule()
+ throws IOException {
+ HttpServer httpServer = startServer(14202, "/tasks/schedule",
+ createHandler(200, "{\"SegmentGenerationAndPushTask\":\"Task_SegmentGenerationAndPushTask_1607470525615\"}",
+ 0));
+ MinionClient minionClient = new MinionClient("localhost", "14202");
+ Assert.assertEquals(minionClient.scheduleMinionTasks().get("SegmentGenerationAndPushTask"),
+ "Task_SegmentGenerationAndPushTask_1607470525615");
+ httpServer.stop(0);
+ }
+
+ @Test
+ public void testTasksStates()
+ throws IOException {
+ HttpServer httpServer = startServer(14203, "/tasks/SegmentGenerationAndPushTask/taskstates",
+ createHandler(200, "{\"Task_SegmentGenerationAndPushTask_1607470525615\":\"IN_PROGRESS\"}", 0));
+ MinionClient minionClient = new MinionClient("http", "localhost", "14203");
+ Assert.assertEquals(minionClient.getTasksStates("SegmentGenerationAndPushTask")
+ .get("Task_SegmentGenerationAndPushTask_1607470525615"), "IN_PROGRESS");
+ httpServer.stop(0);
+ }
+
+ @Test
+ public void testTaskState()
+ throws IOException {
+ HttpServer httpServer = startServer(14204, "/tasks/task/Task_SegmentGenerationAndPushTask_1607470525615/state",
+ createHandler(200, "\"COMPLETED\"", 0));
+ MinionClient minionClient = new MinionClient("http://localhost:14204");
+ Assert.assertEquals(minionClient.getTaskState("Task_SegmentGenerationAndPushTask_1607470525615"), "\"COMPLETED\"");
+ httpServer.stop(0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org