You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/12/29 03:50:30 UTC

[kylin] branch master updated: KYLIN-4613 add build cubev2 and JobRestClient

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new fe04ca8  KYLIN-4613 add build cubev2 and JobRestClient
fe04ca8 is described below

commit fe04ca8d3be6c50742246663d6fd6767a568bcf8
Author: chuxiao <ch...@didichuxing.com>
AuthorDate: Wed Jun 5 21:45:52 2019 +0800

    KYLIN-4613 add build cubev2 and JobRestClient
---
 .../java/org/apache/kylin/common/KylinConfig.java  |  16 +-
 .../apache/kylin/common/restclient/RestClient.java |  16 +-
 .../org/apache/kylin/job/util/JobRestClient.java   | 133 +++++++++++++++++
 .../apache/kylin/tool/job/CubeBuildingCLIV2.java   | 161 +++++++++++++++++++++
 4 files changed, 316 insertions(+), 10 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index 30cf71c..58b1418 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -134,7 +134,7 @@ public class KylinConfig extends KylinConfigBase {
         }
     }
 
-    public static KylinConfig getInstanceFromEnv() {
+    public static KylinConfig getInstanceFromEnv(boolean allowConfigFileNoExist) {
         synchronized (KylinConfig.class) {
             KylinConfig config = THREAD_ENV_INSTANCE.get();
             if (config != null) {
@@ -148,7 +148,15 @@ public class KylinConfig extends KylinConfigBase {
                     buildDefaultOrderedProperties();
 
                     config = new KylinConfig();
-                    config.reloadKylinConfig(buildSiteProperties());
+                    try {
+                        config.reloadKylinConfig(buildSiteProperties());
+                    } catch (KylinConfigCannotInitException e) {
+                        logger.info("Kylin Config Can not Init Exception");
+                        if (!allowConfigFileNoExist) {
+                            throw e;
+                        }
+                    }
+
                     VersionUtil.loadKylinVersion();
                     logger.info("Initialized a new KylinConfig from getInstanceFromEnv : {}",
                             System.identityHashCode(config));
@@ -161,6 +169,10 @@ public class KylinConfig extends KylinConfigBase {
         }
     }
 
+    public static KylinConfig getInstanceFromEnv() {
+        return getInstanceFromEnv(false);
+    }
+
     // Only used in test cases!!!
     public static void destroyInstance() {
         synchronized (KylinConfig.class) {
diff --git a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
index cc96418..fcd8706 100644
--- a/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
+++ b/core-common/src/main/java/org/apache/kylin/common/restclient/RestClient.java
@@ -61,10 +61,10 @@ import com.google.common.base.Strings;
 public class RestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(RestClient.class);
-    private static final String UTF_8 = "UTF-8";
+    protected static final String UTF_8 = "UTF-8";
     private static final String APPLICATION_JSON = "application/json";
-    private static final String INVALID_RESPONSE = "Invalid response ";
-    private static final String CUBES = "/cubes/";
+    protected static final String INVALID_RESPONSE = "Invalid response ";
+    protected static final String CUBES = "/cubes/";
     private static final String WITH_URL = " with url ";
 
     protected static Pattern fullRestPattern = Pattern.compile("(?:([^:]+)[:]([^@]+)[@])?([^:]+)(?:[:](\\d+))?");
@@ -142,7 +142,7 @@ public class RestClient {
         HttpConnectionParams.setConnectionTimeout(httpParams, httpConnectionTimeoutMs);
 
         final PoolingClientConnectionManager cm = new PoolingClientConnectionManager();
-        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        KylinConfig config = KylinConfig.getInstanceFromEnv(true);
         cm.setDefaultMaxPerRoute(config.getRestClientDefaultMaxPerRoute());
         cm.setMaxTotal(config.getRestClientMaxTotal());
 
@@ -434,13 +434,13 @@ public class RestClient {
         return post;
     }
 
-    private HttpPut newPut(String url) {
+    protected HttpPut newPut(String url) {
         HttpPut put = new HttpPut(url);
         addHttpHeaders(put);
         return put;
     }
 
-    private HttpGet newGet(String url) {
+    protected HttpGet newGet(String url) {
         HttpGet get = new HttpGet(url);
         addHttpHeaders(get);
         return get;
@@ -467,7 +467,7 @@ public class RestClient {
         }
     }
 
-    private String getContent(HttpResponse response) throws IOException {
+    protected String getContent(HttpResponse response) throws IOException {
         StringBuffer result = new StringBuffer();
         try (BufferedReader rd = new BufferedReader(
                 new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8))) {
@@ -480,7 +480,7 @@ public class RestClient {
         return result.toString();
     }
 
-    private void cleanup(HttpRequestBase request, HttpResponse response) {
+    protected void cleanup(HttpRequestBase request, HttpResponse response) {
         try {
             if (response != null)
                 EntityUtils.consume(response.getEntity());
diff --git a/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java
new file mode 100644
index 0000000..9b52f35
--- /dev/null
+++ b/core-job/src/main/java/org/apache/kylin/job/util/JobRestClient.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.job.util;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.http.HttpResponse;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.JobInstance;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+public class JobRestClient extends RestClient {
+    private static final String JOBS = "/jobs/";
+
+    public JobRestClient(String host, int port, String userName, String password) {
+        this(host, port, userName, password, null, null);
+    }
+
+    public JobRestClient(String host, int port, String userName, String password, Integer httpConnectionTimeoutMs, Integer httpSocketTimeoutMs) {
+        super(host, port, userName, password, httpConnectionTimeoutMs, httpSocketTimeoutMs);
+    }
+
+    public JobInstance buildCubeV2(String cubeName, long startTime, long endTime, CubeBuildTypeEnum buildType) throws IOException {
+        String url = baseUrl + CUBES + cubeName + "/build";
+        HttpPut put = newPut(url);
+        HttpResponse response = null;
+        try {
+            HashMap<String, String> paraMap = new HashMap<String, String>();
+            paraMap.put("startTime", startTime + "");
+            paraMap.put("endTime", endTime + "");
+            paraMap.put("buildType", buildType.toString());
+            String jsonMsg = new ObjectMapper().writeValueAsString(paraMap);
+            put.setEntity(new StringEntity(jsonMsg, UTF_8));
+            response = client.execute(put);
+            String result = getContent(response);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+                        + result + " with build cube url " + url + "\n" + jsonMsg);
+            } else {
+                return json2JobInstance(result);
+            }
+        } finally {
+            cleanup(put, response);
+        }
+    }
+
+    protected JobInstance json2JobInstance(String json) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
+        JobInstance jobInstance = mapper.readValue(json, JobInstance.class);
+        return jobInstance;
+    }
+
+    public JobInstance getJobStatus(String jobId) throws IOException {
+        String url = baseUrl + JOBS + jobId;
+        HttpGet get = newGet(url);
+        HttpResponse response = null;
+        try {
+            response = client.execute(get);
+            String result = getContent(response);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+                        + result + " with get job status " + jobId);
+            } else {
+                return json2JobInstance(result);
+            }
+        } finally {
+            cleanup(get, response);
+        }
+    }
+
+    public String JobInstance2JsonString(JobInstance jobInstance) throws IOException {
+        ObjectMapper mapper = new ObjectMapper();
+        String jsonString = mapper.writeValueAsString(jobInstance);
+        return jsonString;
+    }
+
+    public JobInstance resumeJob(String jobId) throws IOException {
+        String url = baseUrl + JOBS + jobId + "/resume";
+        HttpPut put = newPut(url);
+        HttpResponse response = null;
+        try {
+            response = client.execute(put);
+            String result = getContent(response);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+                        + result + " with resume job " + jobId);
+            } else {
+                return json2JobInstance(result);
+            }
+        } finally {
+            cleanup(put, response);
+        }
+    }
+
+    public void discardJob(String jobId) throws IOException {
+        String url = baseUrl + JOBS + jobId + "/cancel";
+        HttpPut put = newPut(url);
+        HttpResponse response = null;
+        try {
+            response = client.execute(put);
+            String result = getContent(response);
+            if (response.getStatusLine().getStatusCode() != 200) {
+                throw new IOException(INVALID_RESPONSE + response.getStatusLine().getStatusCode()
+                        + result + " with discard job " + jobId);
+            }
+        } finally {
+            cleanup(put, response);
+        }
+    }
+}
diff --git a/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java
new file mode 100644
index 0000000..bb66db2
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/job/CubeBuildingCLIV2.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.tool.job;
+
+import com.google.common.base.Strings;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.model.CubeBuildTypeEnum;
+import org.apache.kylin.job.JobInstance;
+import org.apache.kylin.job.constant.JobStatusEnum;
+import org.apache.kylin.job.util.JobRestClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class CubeBuildingCLIV2 extends AbstractApplication {
+
+    private static final Logger logger = LoggerFactory.getLogger(CubeBuildingCLIV2.class);
+
+    private static final Option OPTION_CUBE = OptionBuilder.withArgName("cube").hasArg().isRequired(true)
+            .withDescription("Specify for which cube to build").create("cube");
+    private static final Option OPTION_BUILD_TYPE = OptionBuilder.withArgName("buildType").hasArg().isRequired(false)
+            .withDescription("Specify for the build type").create("buildType");
+    private static final Option OPTION_TIME_START = OptionBuilder.withArgName("startTime").hasArg().isRequired(false)
+            .withDescription("Specify the start time of the segment").create("startTime");
+    private static final Option OPTION_TIME_END = OptionBuilder.withArgName("endTime").hasArg().isRequired(true)
+            .withDescription("Specify the end time of the segment").create("endTime");
+    private static final Option OPTION_HOST = OptionBuilder.withArgName("host").hasArg().isRequired(true)
+            .withDescription("Specify the kylin server host").create("host");
+    private static final Option OPTION_PORT = OptionBuilder.withArgName("port").hasArg().isRequired(true)
+            .withDescription("Specify the kylin server port").create("port");
+    private static final Option OPTION_USER_NAME = OptionBuilder.withArgName("userName").hasArg().isRequired(true)
+            .withDescription("Specify the kylin server user name").create("userName");
+    private static final Option OPTION_PASSWORD= OptionBuilder.withArgName("password").hasArg().isRequired(true)
+            .withDescription("Specify the kylin server password").create("password");
+    private static final Option OPTION_WAITING_FOR_END = OptionBuilder.withArgName("waitingForEnd").hasArg().isRequired(false)
+            .withDescription("Specify whether waiting for end").create("waitingForEnd");
+    private static final Option OPTION_RETRY_NUMBER = OptionBuilder.withArgName("retryNumber").hasArg().isRequired(false)
+            .withDescription("Specify retry number when execute failed").create("retryNumber");
+    private static final Option OPTION_DISCARD_ERROR_JOB = OptionBuilder.withArgName("discardErrorJob").hasArg().isRequired(false)
+            .withDescription("Specify discard job when execute failed").create("discardErrorJob");
+
+    private final Options options;
+
+    public CubeBuildingCLIV2() {
+        options = new Options();
+        options.addOption(OPTION_CUBE);
+        options.addOption(OPTION_BUILD_TYPE);
+        options.addOption(OPTION_TIME_START);
+        options.addOption(OPTION_TIME_END);
+        options.addOption(OPTION_HOST);
+        options.addOption(OPTION_PORT);
+        options.addOption(OPTION_USER_NAME);
+        options.addOption(OPTION_PASSWORD);
+        options.addOption(OPTION_WAITING_FOR_END);
+        options.addOption(OPTION_RETRY_NUMBER);
+        options.addOption(OPTION_DISCARD_ERROR_JOB);
+    }
+
+    protected Options getOptions() {
+        return options;
+    }
+
+    protected void execute(OptionsHelper optionsHelper) throws IOException {
+        String cubeName = optionsHelper.getOptionValue(OPTION_CUBE);
+        String buildType = optionsHelper.getOptionValue(OPTION_BUILD_TYPE);
+        if (Strings.isNullOrEmpty(buildType)) {
+            buildType = "BUILD";
+        }
+        Long startTime = 0L;
+        if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_TIME_START))) {
+            startTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_START));
+        }
+        Long endTime = Long.parseLong(optionsHelper.getOptionValue(OPTION_TIME_END));
+        String host = optionsHelper.getOptionValue(OPTION_HOST);
+        Integer port = Integer.parseInt(optionsHelper.getOptionValue(OPTION_PORT));
+
+        String userName = optionsHelper.getOptionValue(OPTION_USER_NAME);
+        String password = optionsHelper.getOptionValue(OPTION_PASSWORD);
+
+        Boolean waitingForEnd = true;
+        if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END))) {
+            waitingForEnd = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_WAITING_FOR_END));
+        }
+        JobRestClient client = new JobRestClient(host, port, userName, password);
+        System.out.println("start building cube.");
+        JobInstance jobInstance = submitJob(client, cubeName, startTime, endTime, buildType);
+        if (waitingForEnd) {
+            int retryNumber = 0;
+            if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER))) {
+                retryNumber = Integer.parseInt(optionsHelper.getOptionValue(OPTION_RETRY_NUMBER));
+            }
+            while (!jobInstance.getStatus().isComplete()) {
+                try {
+                    Thread.sleep(30000);
+                } catch (InterruptedException e) {
+                    System.err.println("Thread interrupted, exit");
+                    System.exit(-1);
+                }
+                jobInstance = client.getJobStatus(jobInstance.getId());
+                System.out.println("job " + jobInstance.getId() + " get status : " + jobInstance.getStatus());
+                if (jobInstance.getStatus().equals(JobStatusEnum.ERROR) && retryNumber > 0) {
+                    System.out.println("retry count is " + retryNumber);
+                    retryNumber--;
+                    jobInstance = client.resumeJob(jobInstance.getId());
+                }
+            }
+            if (!jobInstance.getStatus().equals(JobStatusEnum.FINISHED)) {
+                boolean discardErrorJob = false;
+                if (!Strings.isNullOrEmpty(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB))) {
+                    discardErrorJob = Boolean.parseBoolean(optionsHelper.getOptionValue(OPTION_DISCARD_ERROR_JOB));
+                }
+                if (discardErrorJob) {
+                    client.discardJob(jobInstance.getId());
+                }
+                System.exit(-1);
+            }
+
+        }
+    }
+
+    private JobInstance submitJob(JobRestClient client, String cubeName, long startDate, long endDate, String buildType) throws IOException {
+        CubeBuildTypeEnum buildTypeEnum = CubeBuildTypeEnum.valueOf(buildType);
+        JobInstance jobInstance = client.buildCubeV2(cubeName, startDate, endDate, buildTypeEnum);
+        System.out.println("building cube job:");
+        System.out.println(client.JobInstance2JsonString(jobInstance));
+        return jobInstance;
+    }
+
+
+    public static void main(String[] args) {
+        CubeBuildingCLIV2 cli = new CubeBuildingCLIV2();
+        try {
+            cli.execute(args);
+            System.exit(0);
+        } catch (Exception e) {
+            logger.error("error running cube building", e);
+            System.exit(-1);
+        }
+    }
+}