You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/29 03:50:30 UTC
[kylin] branch master updated: KYLIN-4613 add build cubev2 and
JobRestClient
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new fe04ca8 KYLIN-4613 add build cubev2 and JobRestClient
fe04ca8 is described below
commit fe04ca8d3be6c50742246663d6fd6767a568bcf8
Author: chuxiao <ch...@didichuxing.com>
AuthorDate: Wed Jun 5 21:45:52 2019 +0800
KYLIN-4613 add build cubev2 and JobRestClient
---
.../java/org/apache/kylin/common/KylinConfig.java | 16 +-
.../apache/kylin/common/restclient/RestClient.java | 16 +-
.../org/apache/kylin/job/util/JobRestClient.java | 133 +++++++++++++++++
.../apache/kylin/tool/job/CubeBuildingCLIV2.java | 161 +++++++++++++++++++++
4 files changed, 316 insertions(+), 10 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 30cf71c..58b1418 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -134,7 +134,7 @@ public class KylinConfig extends KylinConfigBase {
}
}
- public static KylinConfig getInstanceFromEnv() {
+ public static KylinConfig getInstanceFromEnv(boolean allowConfigFileNoExist) {
synchronized (KylinConfig.class) {
KylinConfig config = THREAD_ENV_INSTANCE.get();
if (config != null) {
@@ -148,7 +148,15 @@ public class KylinConfig extends KylinConfigBase {
buildDefaultOrderedProperties();
config = new KylinConfig();
- config.reloadKylinConfig(buildSiteProperties());
+ try {
+ config.reloadKylinConfig(buildSiteProperties());
+ } catch (KylinConfigCannotInitException e) {
+ logger.info("Kylin Config Can not Init Exception");
+ if (!allowConfigFileNoExist) {
+ throw e;
+ }
+ }
+
VersionUtil.loadKylinVersion();
logger.info("Initialized a new KylinConfig from getInstanceFromEnv : {}",
System.identityHashCode(config));
@@ -161,6 +169,10 @@ public class KylinConfig extends KylinConfigBase {
}
}
+ public static KylinConfig getInstanceFromEnv() {
+ return getInstanceFromEnv(false);
+ }
+
// Only used in test cases!!!
public static void destroyInstance() {
synchronized (KylinConfig.class) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index cc96418..fcd8706 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -61,10 +61,10 @@ import com.google.common.base.Strings;
public class RestClient {
private static final Logger logger = LoggerFactory.getLogger(RestClient.class);
- private static final String UTF_8 = "UTF-8";
+ protected static final String UTF_8 = "UTF-8";
private static final String APPLICATION_JSON = "application/json";
- private static final String INVALID_RESPONSE = "Invalid response ";
- private static final String CUBES = "/cubes/";
+ protected static final String INVALID_RESPONSE = "Invalid response ";
+ protected static final String CUBES = "/cubes/";
private static final String WITH_URL = " with url ";
protected static Pattern fullRestPattern = Pattern.compile("(?:([^:]+)[:]([^@]+)[@])?([^:]+)(?:[:](\\d+))?");
@@ -142,7 +142,7 @@ public class RestClient {
HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs);
final PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
+ KylinConfig config = KylinConfig.getInstanceFromEnv(true);
cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute());
cm.setMaxTotal(config.getRestClientMaxTotal());
@@ -434,13 +434,13 @@ public class RestClient {
return post;
}
- private HttpPut newPut(String url) {
+ protected HttpPut newPut(String url) {
HttpPut put = new HttpPut(url);
addHttpHeaders(put);
return put;
}
- private HttpGet newGet(String url) {
+ protected HttpGet newGet(String url) {
HttpGet get = new HttpGet(url);
addHttpHeaders(get);
return get;
@@ -467,7 +467,7 @@ public class RestClient {
}
}
- private String getContent(HttpResponse response) throws IOException {
+ protected String getContent(HttpResponse response) throws IOException {
StringBuffer result = new StringBuffer();
try (BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
@@ -480,7 +480,7 @@ public class RestClient {
return result.toString();
}
- private void cleanup(HttpRequestBase request, HttpResponse response) {
+ protected void cleanup(HttpRequestBase request, HttpResponse response) {
try {
if (response != null)
EntityUtils.consume(response.getEntity());
diff --git a/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java
new file mode 100644
index 0000000..9b52f35
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.JobInstance;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class JobRestClient extends RestClient {
+ private static final String JOBS = "/jobs/";
+
+ public JobRestClient(String host, int port, String userName, String password) {
+ this(host, port, userName, password, null, null);
+ }
+
+ public JobRestClient(String host, int port, String userName, String password, Integer httpConnectionTimeoutMs, Integer httpSocketTimeoutMs) {
+ super(host, port, userName, password, httpConnectionTimeoutMs, httpSocketTimeoutMs);
+ }
+
+ public JobInstance buildCubeV2(String cubeName, long startTime, long endTime, CubeBuildTypeEnum buildType) throws IOException {
+ String url = baseUrl + CUBES + cubeName + "/build";
+ HttpPut put = newPut(url);
+ HttpResponse response = null;
+ try {
+ HashMap<String, String> paraMap = new HashMap<String, String>();
+ paraMap.put("startTime", startTime + "");
+ paraMap.put("endTime", endTime + "");
+ paraMap.put("buildType", buildType.toString());
+ String jsonMsg = new ObjectMapper().writeValueAsString(paraMap);
+ put.setEntity(new StringEntity(jsonMsg, UTF_8));
+ response = client.execute(put);
+ String result = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+ + result + " with build cube url " + url + "\n" + jsonMsg);
+ } else {
+ return json2JobInstance(result);
+ }
+ } finally {
+ cleanup(put, response);
+ }
+ }
+
+ protected JobInstance json2JobInstance(String json) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+ JobInstance jobInstance = mapper.readValue(json, JobInstance.class);
+ return jobInstance;
+ }
+
+ public JobInstance getJobStatus(String jobId) throws IOException {
+ String url = baseUrl + JOBS + jobId;
+ HttpGet get = newGet(url);
+ HttpResponse response = null;
+ try {
+ response = client.execute(get);
+ String result = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+ + result + " with get job status " + jobId);
+ } else {
+ return json2JobInstance(result);
+ }
+ } finally {
+ cleanup(get, response);
+ }
+ }
+
+ public String JobInstance2JsonString(JobInstance jobInstance) throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ String jsonString = mapper.writeValueAsString(jobInstance);
+ return jsonString;
+ }
+
+ public JobInstance resumeJob(String jobId) throws IOException {
+ String url = baseUrl + JOBS + jobId + "/resume";
+ HttpPut put = newPut(url);
+ HttpResponse response = null;
+ try {
+ response = client.execute(put);
+ String result = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+ + result + " with resume job " + jobId);
+ } else {
+ return json2JobInstance(result);
+ }
+ } finally {
+ cleanup(put, response);
+ }
+ }
+
+ public void discardJob(String jobId) throws IOException {
+ String url = baseUrl + JOBS + jobId + "/cancel";
+ HttpPut put = newPut(url);
+ HttpResponse response = null;
+ try {
+ response = client.execute(put);
+ String result = getContent(response);
+ if (response.getStatusLine().getStatusCode() != 200) {
+ throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+ + result + " with discard job " + jobId);
+ }
+ } finally {
+ cleanup(put, response);
+ }
+ }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java
new file mode 100644
index 0000000..bb66db2
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.tool.job;
+
+import com.google.common.base.Strings;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.util.JobRestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class CubeBuildingCLIV2 extends AbstractApplication {
+
+ private static final Logger logger = LoggerFactory.getLogger(CubeBuildingCLIV2.class);
+
+ private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(true)
+ .withDescription("Specify for which cube to build").create("cube");
+ private static final Option OPTION_BUILD_TYPE = OptionBuilder.withArgName("buildType").hasArg().isRequired(false)
+ .withDescription("Specify for the build type").create("buildType");
+ private static final Option OPTION_TIME_START = OptionBuilder.withArgName("startTime").hasArg().isRequired(false)
+ .withDescription("Specify the start time of the segment").create("startTime");
+ private static final Option OPTION_TIME_END = OptionBuilder.withArgName("endTime").hasArg().isRequired(true)
+ .withDescription("Specify the end time of the segment").create("endTime");
+ private static final Option OPTION_HOST = OptionBuilder.withArgName("host").hasArg().isRequired(true)
+ .withDescription("Specify the kylin server host").create("host");
+ private static final Option OPTION_PORT = OptionBuilder.withArgName("port").hasArg().isRequired(true)
+ .withDescription("Specify the kylin server port").create("port");
+ private static final Option OPTION_USER_NAME = OptionBuilder.withArgName("userName").hasArg().isRequired(true)
+ .withDescription("Specify the kylin server user name").create("userName");
+ private static final Option OPTION_PASSWORD= OptionBuilder.withArgName("password").hasArg().isRequired(true)
+ .withDescription("Specify the kylin server password").create("password");
+ private static final Option OPTION_WAITING_FOR_END = OptionBuilder.withArgName("waitingForEnd").hasArg().isRequired(false)
+ .withDescription("Specify whether waiting for end").create("waitingForEnd");
+ private static final Option OPTION_RETRY_NUMBER = OptionBuilder.withArgName("retryNumber").hasArg().isRequired(false)
+ .withDescription("Specify retry number when execute failed").create("retryNumber");
+ private static final Option OPTION_DISCARD_ERROR_JOB = OptionBuilder.withArgName("discardErrorJob").hasArg().isRequired(false)
+ .withDescription("Specify discard job when execute failed").create("discardErrorJob");
+
+ private final Options options;
+
+ public CubeBuildingCLIV2() {
+ options = new Options();
+ options.addOption(OPTION_CUBE);
+ options.addOption(OPTION_BUILD_TYPE);
+ options.addOption(OPTION_TIME_START);
+ options.addOption(OPTION_TIME_END);
+ options.addOption(OPTION_HOST);
+ options.addOption(OPTION_PORT);
+ options.addOption(OPTION_USER_NAME);
+ options.addOption(OPTION_PASSWORD);
+ options.addOption(OPTION_WAITING_FOR_END);
+ options.addOption(OPTION_RETRY_NUMBER);
+ options.addOption(OPTION_DISCARD_ERROR_JOB);
+ }
+
+ protected Options getOptions() {
+ return options;
+ }
+
+ protected void execute(OptionsHelper optionsHelper) throws IOException {
+ String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+ String buildType = optionsHelper.getOptionValue(OPTION_BUILD_TYPE);
+ if (Strings.isNullOrEmpty(buildType)) {
+ buildType = "BUILD";
+ }
+ Long startTime = 0L;
+ if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_TIME_START))) {
+ startTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_START));
+ }
+ Long endTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_END));
+ String host = optionsHelper.getOptionValue(OPTION_HOST);
+ Integer port = Integer.parseInt(optionsHelper.getOptionValue(OPTION_PORT));
+
+ String userName = optionsHelper.getOptionValue(OPTION_USER_NAME);
+ String password = optionsHelper.getOptionValue(OPTION_PASSWORD);
+
+ Boolean waitingForEnd = true;
+ if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END))) {
+ waitingForEnd = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END));
+ }
+ JobRestClient client = new JobRestClient(host, port, userName, password);
+ System.out.println("start building cube.");
+ JobInstance jobInstance = submitJob(client, cubeName, startTime, endTime, buildType);
+ if (waitingForEnd) {
+ int retryNumber = 0;
+ if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER))) {
+ retryNumber = Integer.parseInt(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER));
+ }
+ while (!jobInstance.getStatus().isComplete()) {
+ try {
+ Thread.sleep(30000);
+ } catch (InterruptedException e) {
+ System.err.println("Thread interrupted, exit");
+ System.exit(-1);
+ }
+ jobInstance = client.getJobStatus(jobInstance.getId());
+ System.out.println("job " + jobInstance.getId() + " get status : " + jobInstance.getStatus());
+ if (jobInstance.getStatus().equals(JobStatusEnum.ERROR) && retryNumber > 0) {
+ System.out.println("retry count is " + retryNumber);
+ retryNumber--;
+ jobInstance = client.resumeJob(jobInstance.getId());
+ }
+ }
+ if (!jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+ boolean discardErrorJob = false;
+ if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB))) {
+ discardErrorJob = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB));
+ }
+ if (discardErrorJob) {
+ client.discardJob(jobInstance.getId());
+ }
+ System.exit(-1);
+ }
+
+ }
+ }
+
+ private JobInstance submitJob(JobRestClient client, String cubeName, long startDate, long endDate, String buildType) throws IOException {
+ CubeBuildTypeEnum buildTypeEnum = CubeBuildTypeEnum.valueOf(buildType);
+ JobInstance jobInstance = client.buildCubeV2(cubeName, startDate, endDate, buildTypeEnum);
+ System.out.println("building cube job:");
+ System.out.println(client.JobInstance2JsonString(jobInstance));
+ return jobInstance;
+ }
+
+
+ public static void main(String[] args) {
+ CubeBuildingCLIV2 cli = new CubeBuildingCLIV2();
+ try {
+ cli.execute(args);
+ System.exit(0);
+ } catch (Exception e) {
+ logger.error("error running cube building", e);
+ System.exit(-1);
+ }
+ }
+}