You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2016/02/11 13:49:51 UTC
[12/51] [partial] kylin git commit: KYLIN-1416 keep only website in
document branch
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index 2813596..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
-
- public ShellCmdOutput() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(JobStepStatusEnum s) {
- this.status = s;
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void appendOutput(String message) {
- output.append(message).append(System.getProperty("line.separator"));
- log.debug(message);
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void setExitCode(int code) {
- exitCode = code;
- }
-
- @Override
- public void reset() {
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 873607c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @author xduo
- *
- */
-public class HadoopCmdOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
- private final StringBuilder output;
- private final Job job;
-
- public HadoopCmdOutput(Job job, StringBuilder output) {
- super();
- this.job = job;
- this.output = output;
- }
-
- public String getMrJobId() {
- return getInfo().get(ExecutableConstants.MR_JOB_ID);
- }
-
- public Map<String, String> getInfo() {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
- }
- return status;
- } else {
- return Collections.emptyMap();
- }
- }
-
- private String mapInputRecords;
- private String hdfsBytesWritten;
- private String hdfsBytesRead;
-
- public String getMapInputRecords() {
- return mapInputRecords;
- }
-
- public String getHdfsBytesWritten() {
- return hdfsBytesWritten;
- }
-
- public String getHdfsBytesRead() {
- return hdfsBytesRead;
- }
-
- public void updateJobCounter() {
- try {
- Counters counters = job.getCounters();
- if (counters == null) {
- String errorMsg = "no counters for job " + getMrJobId();
- log.warn(errorMsg);
- output.append(errorMsg);
- return;
- }
- this.output.append(counters.toString()).append("\n");
- log.debug(counters.toString());
-
- mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
- hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
- hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- output.append(e.getLocalizedMessage());
-
- mapInputRecords = "0";
- hdfsBytesWritten = "0";
- hdfsBytesRead = "0";
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index dc412ce..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by qianzhou on 12/26/14.
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
- private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
- private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
- public HadoopShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getJobClass();
- String params = getJobParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob job = constructor.newInstance();
- String[] args = params.trim().split("\\s+");
- logger.info("parameters of the HadoopShellExecutable:");
- logger.info(params);
- int result;
- StringBuilder log = new StringBuilder();
- try {
- result = ToolRunner.run(job, args);
- } catch (Exception ex) {
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- result = 2;
- }
- log.append("result code:").append(result);
- return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()) : new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setJobParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
- public String getJobParams() {
- return getParam(KEY_PARAMS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index 75c461b..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.util.HiveClient;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-/**
- * Created by qianzhou on 1/15/15.
- */
-public class HqlExecutable extends AbstractExecutable {
-
- private static final String HQL = "hql";
- private static final String HIVE_CONFIG = "hive-config";
-
- public HqlExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- Map<String, String> configMap = getConfiguration();
- HiveClient hiveClient = new HiveClient(configMap);
-
- for (String hql : getHqls()) {
- hiveClient.executeHQL(hql);
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- } catch (Exception e) {
- logger.error("error run hive query:" + getHqls(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setConfiguration(Map<String, String> configMap) {
- if (configMap != null) {
- String configStr = "";
- try {
- configStr = JsonUtil.writeValueAsString(configMap);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- setParam(HIVE_CONFIG, configStr);
- }
- }
-
- @SuppressWarnings("unchecked")
- private Map<String, String> getConfiguration() {
- String configStr = getParam(HIVE_CONFIG);
- Map<String, String> result = null;
- if (configStr != null) {
- try {
- result = JsonUtil.readValue(configStr, HashMap.class);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- return result;
- }
-
- public void setHqls(List<String> hqls) {
- setParam(HQL, StringUtils.join(hqls, ";"));
- }
-
- private List<String> getHqls() {
- final String hqls = getParam(HQL);
- if (hqls != null) {
- return Lists.newArrayList(StringUtils.split(hqls, ";"));
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index cb6e76c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.yarn.conf.HAUtil;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.RMHAUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
- private static final String KEY_MR_JOB = "MR_JOB_CLASS";
- private static final String KEY_PARAMS = "MR_JOB_PARAMS";
-
- public MapReduceExecutable() {
- super();
- }
-
- @Override
- protected void onExecuteStart(ExecutableContext executableContext) {
- final Output output = executableManager.getOutput(getId());
- if (output.getExtra().containsKey(START_TIME)) {
- final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
- if (mrJobId == null) {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- return;
- }
- try {
- Configuration conf = HadoopUtil.getCurrentConfiguration();
- Job job = new Cluster(conf).getJob(JobID.forName(mrJobId));
- if (job.getJobState() == JobStatus.State.FAILED) {
- //remove previous mr job info
- super.onExecuteStart(executableContext);
- } else {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- }
- } catch (IOException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- } catch (InterruptedException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- }
- } else {
- super.onExecuteStart(executableContext);
- }
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getMapReduceJobClass();
- String params = getMapReduceParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- Job job;
- final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
- if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
- Configuration conf = HadoopUtil.getCurrentConfiguration();
- job = new Cluster(conf).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
- logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
- } else {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob hadoopJob = constructor.newInstance();
- hadoopJob.setConf(HadoopUtil.getCurrentConfiguration());
- hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
- logger.info("parameters of the MapReduceExecutable:");
- logger.info(params);
- String[] args = params.trim().split("\\s+");
- try {
- //for async mr job, ToolRunner just return 0;
- ToolRunner.run(hadoopJob, args);
- } catch (Exception ex) {
- StringBuilder log = new StringBuilder();
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- log.append("result code:").append(2);
- return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
- }
- job = hadoopJob.getJob();
- }
- final StringBuilder output = new StringBuilder();
- final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
- final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
- if (restStatusCheckUrl == null) {
- logger.error("restStatusCheckUrl is null");
- return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
- }
- String mrJobId = hadoopCmdOutput.getMrJobId();
- HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output, context.getConfig());
- JobStepStatusEnum status = JobStepStatusEnum.NEW;
- while (!isDiscarded()) {
- JobStepStatusEnum newStatus = statusChecker.checkStatus();
- if (status == JobStepStatusEnum.KILLED) {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String> emptyMap(), "killed by admin");
- return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
- }
- if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
- final long waitTime = System.currentTimeMillis() - getStartTime();
- setMapReduceWaitTime(waitTime);
- }
- status = newStatus;
- executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
- if (status.isComplete()) {
- hadoopCmdOutput.updateJobCounter();
- final Map<String, String> info = hadoopCmdOutput.getInfo();
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
- executableManager.addJobInfo(getId(), info);
-
- if (status == JobStepStatusEnum.FINISHED) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- } else {
- return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
- }
- }
- Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
- }
-
- // try to kill running map-reduce job to release resources.
- if (job != null) {
- try {
- job.killJob();
- } catch (Exception e) {
- logger.warn("failed to kill hadoop job: " + job.getJobID(), e);
- }
- }
- return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- private String getRestStatusCheckUrl(Job job, KylinConfig config) {
- final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
- if (yarnStatusCheckUrl != null) {
- return yarnStatusCheckUrl;
- } else {
- logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
- }
- String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, job.getConfiguration());
- if (HAUtil.isHAEnabled(job.getConfiguration())) {
- YarnConfiguration conf = new YarnConfiguration(job.getConfiguration());
- String active = RMHAUtils.findActiveRMHAId(conf);
- rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf);
- }
- if (StringUtils.isEmpty(rmWebHost)) {
- return null;
- }
- if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
- //do nothing
- } else {
- rmWebHost = "http://" + rmWebHost;
- }
- logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
- return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
- }
-
- public long getMapReduceWaitTime() {
- return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
- }
-
- public void setMapReduceWaitTime(long t) {
- addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
- }
-
- public String getMapReduceJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getMapReduceParams() {
- return getParam(KEY_PARAMS);
- }
-
- public void setMapReduceParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 19aa915..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-import com.google.common.collect.Maps;
-
-/**
- * Created by qianzhou on 12/26/14.
- */
-public class ShellExecutable extends AbstractExecutable {
-
- private static final String CMD = "cmd";
-
- public ShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- logger.info("executing:" + getCmd());
- final ShellExecutableLogger logger = new ShellExecutableLogger();
- final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
- executableManager.addJobInfo(getId(), logger.getInfo());
- return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond());
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setCmd(String cmd) {
- setParam(CMD, cmd);
- }
-
- public String getCmd() {
- return getParam(CMD);
- }
-
- private static class ShellExecutableLogger implements Logger {
-
- private final Map<String, String> info = Maps.newHashMap();
-
- private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
- private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
- private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
- private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
- // hive
- private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
- @Override
- public void log(String message) {
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, appId);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- info.put(ExecutableConstants.MR_JOB_ID, jobId);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- }
-
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
- }
-
- Map<String, String> getInfo() {
- return info;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 38f4a87..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public interface BatchConstants {
-
- char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
- String CFG_CUBE_NAME = "cube.name";
- String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
- String CFG_CUBE_CUBOID_LEVEL = "cube.cuboid.level";
-
- String CFG_II_NAME = "ii.name";
- String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
- String OUTPUT_PATH = "output.path";
-
- String TABLE_NAME = "table.name";
- String TABLE_COLUMNS = "table.columns";
-
- String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
- String MAPREDUCE_COUNTER_GROUP_NAME = "Cube Builder";
-
- String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
- String REGION_NUMBER = "region.number";
- String REGION_NUMBER_MIN = "region.number.min";
- String REGION_NUMBER_MAX = "region.number.max";
- String REGION_SPLIT_SIZE = "region.split.size";
- String CUBE_CAPACITY = "cube.capacity";
-
- String CFG_KYLIN_LOCAL_TEMP_DIR = "/tmp/kylin/";
- String CFG_KYLIN_HDFS_TEMP_DIR = "/tmp/kylin/";
-
- int COUNTER_MAX = 100000;
- int ERROR_RECORD_THRESHOLD = 100;
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index 3d98b0b..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * Created by qianzhou on 1/5/15.
- */
-public final class ExecutableConstants {
-
- private ExecutableConstants() {
- }
-
- public static final String YARN_APP_ID = "yarn_application_id";
-
- public static final String YARN_APP_URL = "yarn_application_tracking_url";
- public static final String MR_JOB_ID = "mr_job_id";
- public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
- public static final String SOURCE_RECORDS_COUNT = "source_records_count";
- public static final String SOURCE_RECORDS_SIZE = "source_records_size";
- public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
- public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
- public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
- public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
- public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
- public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
- public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
- public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
- public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
- public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
- public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
- public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
- public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
- public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
- public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
- public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
- public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-
- public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
- public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
- public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
- public static final String PROP_JOB_FLOW = "jobFlow";
- public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
- public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
- public static final String PROP_COMMAND = "command";
- // public static final String PROP_STORAGE_LOCATION =
- // "storageLocationIdentifier";
- public static final String PROP_JOB_ASYNC = "jobAsync";
- public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
- public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
- public static final String PROP_JOB_KILLED = "jobKilled";
- public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
- public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Source Records Count: ${source_records_count}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index 9c8f083..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
- private final int code;
-
- private JobStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStatusEnum getByCode(int statusCode) {
- for (JobStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 0e4c18e..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- *
- */
-public enum JobStepCmdTypeEnum {
- SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index fbcfd97..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
- private final int code;
-
- private JobStepStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStepStatusEnum getByCode(int statusCode) {
- for (JobStepStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
- }
-
- public boolean isRunable() {
- return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
deleted file mode 100644
index 182196c..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJob.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Date;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public class CubingJob extends DefaultChainedExecutable {
-
- public CubingJob() {
- super();
- }
-
- private static final String CUBE_INSTANCE_NAME = "cubeName";
- private static final String SEGMENT_ID = "segmentId";
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
- public void setCubeName(String name) {
- setParam(CUBE_INSTANCE_NAME, name);
- }
-
- public String getCubeName() {
- return getParam(CUBE_INSTANCE_NAME);
- }
-
- void setSegmentIds(List<String> segmentIds) {
- setParam(SEGMENT_ID, StringUtils.join(segmentIds, ","));
- }
-
- void setSegmentId(String segmentId) {
- setParam(SEGMENT_ID, segmentId);
- }
-
- public String getSegmentIds() {
- return getParam(SEGMENT_ID);
- }
-
- @Override
- protected Pair<String, String> formatNotifications(ExecutableState state) {
- final Output output = jobService.getOutput(getId());
- String logMsg = "";
- switch (output.getState()) {
- case ERROR:
- logMsg = output.getVerboseMsg();
- break;
- case DISCARDED:
- break;
- case SUCCEED:
- break;
- default:
- return null;
- }
- String content = ExecutableConstants.NOTIFY_EMAIL_TEMPLATE;
- content = content.replaceAll("\\$\\{job_name\\}", getName());
- content = content.replaceAll("\\$\\{result\\}", state.toString());
- content = content.replaceAll("\\$\\{cube_name\\}", getCubeName());
- content = content.replaceAll("\\$\\{source_records_count\\}", StringUtil.noBlank(getSourceRecordCount(), "0"));
- content = content.replaceAll("\\$\\{start_time\\}", new Date(getStartTime()).toString());
- content = content.replaceAll("\\$\\{duration\\}", getDuration() / 60000 + "mins");
- content = content.replaceAll("\\$\\{mr_waiting\\}", getMapReduceWaitTime() / 60000 + "mins");
- content = content.replaceAll("\\$\\{last_update_time\\}", new Date(getLastModified()).toString());
- content = content.replaceAll("\\$\\{submitter\\}", StringUtil.noBlank(getSubmitter(), "missing submitter"));
- content = content.replaceAll("\\$\\{error_log\\}", StringUtil.noBlank(logMsg, "no error log"));
-
- try {
- InetAddress inetAddress = InetAddress.getLocalHost();
- content = content.replaceAll("\\$\\{job_engine\\}", inetAddress.getCanonicalHostName());
- } catch (UnknownHostException e) {
- logger.warn(e.getLocalizedMessage(), e);
- }
-
- String title = "[" + state.toString() + "] - [Kylin Cube Build Job]-" + getCubeName();
- return Pair.of(title, content);
- }
-
- @Override
- protected void onExecuteFinished(ExecuteResult result, ExecutableContext executableContext) {
- long time = 0L;
- for (AbstractExecutable task : getTasks()) {
- final ExecutableState status = task.getStatus();
- if (status != ExecutableState.SUCCEED) {
- break;
- }
- if (task instanceof MapReduceExecutable) {
- time += ((MapReduceExecutable) task).getMapReduceWaitTime();
- }
- }
- setMapReduceWaitTime(time);
- super.onExecuteFinished(result, executableContext);
- }
-
- public long getMapReduceWaitTime() {
- return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
- }
-
- public void setMapReduceWaitTime(long t) {
- addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
- }
-
-
- public final String getSourceRecordCount() {
- for (AbstractExecutable task : getTasks()) {
- if (ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID.equals(task.getName())) {
- return getExtraInfo(task.getOutput(), ExecutableConstants.SOURCE_RECORDS_COUNT);
- }
- }
- return "N/A";
- }
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java b/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
deleted file mode 100644
index 80c030f..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/CubingJobBuilder.java
+++ /dev/null
@@ -1,476 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.job.AbstractJobBuilder;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
-import org.apache.kylin.job.hadoop.hive.CubeJoinedFlatTableDesc;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- * Created by qianzhou on 12/25/14.
- */
-public final class CubingJobBuilder extends AbstractJobBuilder {
-
- public CubingJobBuilder(JobEngineConfig engineConfig) {
- super(engineConfig);
- }
-
- public CubingJob buildJob(CubeSegment seg) {
- checkPreconditions(seg);
-
- final CubingJob result = initialJob(seg, "BUILD");
- final String jobId = result.getId();
- final String cuboidRootPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
- final List<String> toDeletePaths = Lists.newArrayList();
-
- // cubing
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(seg, cuboidRootPath, result, toDeletePaths);
- String intermediateHiveTableStepId = twoSteps.getFirst().getId();
- String baseCuboidStepId = twoSteps.getSecond().getId();
-
- // convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, cuboidRootPath, result);
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterBuildStep(seg, intermediateHiveTableStepId, baseCuboidStepId, convertCuboidToHfileStep.getId(), jobId));
-
- final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
- final String hiveIntermediateTable = this.getIntermediateHiveTableName(intermediateTableDesc, jobId);
- result.addTask(createGarbageCollectionStep(seg, null, hiveIntermediateTable, toDeletePaths));
-
- return result;
- }
-
- public CubingJob buildAndMergeJob(CubeSegment appendSegment, CubeSegment mergeSegment) {
- checkPreconditions(appendSegment, mergeSegment);
-
- CubingJob result = initialJob(mergeSegment, "BUILD");
- result.setSegmentIds(Lists.newArrayList(new String[] { appendSegment.getUuid(), mergeSegment.getUuid() }));
- final String jobId = result.getId();
- final String appendRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/append_cuboid/";
- final String mergedRootPath = getJobWorkingDir(jobId) + "/" + appendSegment.getCubeInstance().getName() + "/cuboid/";
- List<String> mergingSegmentIds = Lists.newArrayList();
- List<String> mergingCuboidPaths = Lists.newArrayList();
- List<String> mergingHTables = Lists.newArrayList();
- final List<String> toDeletePaths = Lists.newArrayList();
-
- // cubing the incremental segment
- Pair<AbstractExecutable, AbstractExecutable> twoSteps = addCubingSteps(appendSegment, appendRootPath, result, toDeletePaths);
- final String intermediateHiveTableStepId = twoSteps.getFirst().getId();
- final String baseCuboidStepId = twoSteps.getSecond().getId();
-
- // update the append segment info
- result.addTask(createUpdateCubeInfoAfterBuildStep(appendSegment, intermediateHiveTableStepId, baseCuboidStepId, null, jobId));
-
- List<CubeSegment> mergingSegments = mergeSegment.getCubeInstance().getMergingSegments(mergeSegment);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
-
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingHTables.add(merging.getStorageLocationIdentifier());
- if (merging.equals(appendSegment)) {
- mergingCuboidPaths.add(appendRootPath + "*");
- } else {
- mergingCuboidPaths.add(getPathToMerge(merging));
- }
- toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
- }
-
- // merge cuboid
- addMergeSteps(mergeSegment, mergingSegmentIds, mergingCuboidPaths, mergedRootPath, result);
-
- // convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(mergeSegment, mergedRootPath, result);
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(mergeSegment, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createGarbageCollectionStep(mergeSegment, mergingHTables, null, toDeletePaths));
-
- return result;
- }
-
- public CubingJob mergeJob(CubeSegment seg) {
- checkPreconditions(seg);
-
- CubingJob result = initialJob(seg, "MERGE");
- final String jobId = result.getId();
- final String mergedCuboidPath = getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/cuboid/";
-
- List<CubeSegment> mergingSegments = seg.getCubeInstance().getMergingSegments(seg);
- Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
- List<String> mergingSegmentIds = Lists.newArrayList();
- List<String> mergingCuboidPaths = Lists.newArrayList();
- List<String> mergingHTables = Lists.newArrayList();
- final List<String> toDeletePaths = Lists.newArrayList();
-
- for (CubeSegment merging : mergingSegments) {
- mergingSegmentIds.add(merging.getUuid());
- mergingCuboidPaths.add(getPathToMerge(merging));
- mergingHTables.add(merging.getStorageLocationIdentifier());
- toDeletePaths.add(getJobWorkingDir(merging.getLastBuildJobID()));
- }
-
- // merge cuboid
- addMergeSteps(seg, mergingSegmentIds, mergingCuboidPaths, mergedCuboidPath, result);
-
- // convert htable
- AbstractExecutable convertCuboidToHfileStep = addHTableSteps(seg, mergedCuboidPath, result);
-
- // update cube info
- result.addTask(createUpdateCubeInfoAfterMergeStep(seg, mergingSegmentIds, convertCuboidToHfileStep.getId(), jobId));
- result.addTask(createGarbageCollectionStep(seg, mergingHTables, null, toDeletePaths));
- return result;
- }
-
- void addMergeSteps(CubeSegment seg, List<String> mergingSegmentIds, List<String> mergingCuboidPaths, String mergedCuboidPath, CubingJob result) {
-
- result.addTask(createMergeDictionaryStep(seg, mergingSegmentIds));
-
- String formattedPath = StringUtils.join(mergingCuboidPaths, ",");
- result.addTask(createMergeCuboidDataStep(seg, formattedPath, mergedCuboidPath));
- }
-
- Pair<AbstractExecutable, AbstractExecutable> addCubingSteps(CubeSegment seg, String cuboidRootPath, CubingJob result, List<String> toDeletePaths) {
- final int groupRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getNCuboidBuildLevels();
- final int totalRowkeyColumnsCount = seg.getCubeDesc().getRowkey().getRowKeyColumns().length;
-
- final String jobId = result.getId();
- final CubeJoinedFlatTableDesc intermediateTableDesc = new CubeJoinedFlatTableDesc(seg.getCubeDesc(), seg);
- final String intermediateHiveTableName = getIntermediateHiveTableName(intermediateTableDesc, jobId);
- final String intermediateHiveTableLocation = getIntermediateHiveTableLocation(intermediateTableDesc, jobId);
- final String factDistinctColumnsPath = getFactDistinctColumnsPath(seg, jobId);
- final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
-
- final AbstractExecutable intermediateHiveTableStep = createIntermediateHiveTableStep(intermediateTableDesc, jobId);
- result.addTask(intermediateHiveTableStep);
-
- result.addTask(createFactDistinctColumnsStep(seg, intermediateHiveTableName, jobId));
-
- result.addTask(createBuildDictionaryStep(seg, factDistinctColumnsPath));
-
- // base cuboid step
- final MapReduceExecutable baseCuboidStep = createBaseCuboidStep(seg, intermediateHiveTableLocation, cuboidOutputTempPath);
- result.addTask(baseCuboidStep);
-
- // n dim cuboid steps
- for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnsCount - i;
- result.addTask(createNDimensionCuboidStep(seg, cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount));
- }
-
- toDeletePaths.add(intermediateHiveTableLocation);
- toDeletePaths.add(factDistinctColumnsPath);
-
- return new Pair<AbstractExecutable, AbstractExecutable>(intermediateHiveTableStep, baseCuboidStep);
- }
-
- AbstractExecutable addHTableSteps(CubeSegment seg, String cuboidRootPath, CubingJob result) {
- final String jobId = result.getId();
- final String cuboidPath = cuboidRootPath + "*";
-
- result.addTask(createRangeRowkeyDistributionStep(seg, cuboidPath, jobId));
- // create htable step
- result.addTask(createCreateHTableStep(seg, jobId));
- // generate hfiles step
- final MapReduceExecutable convertCuboidToHfileStep = createConvertCuboidToHfileStep(seg, cuboidPath, jobId);
- result.addTask(convertCuboidToHfileStep);
- // bulk load step
- result.addTask(createBulkLoadStep(seg, jobId));
-
- return convertCuboidToHfileStep;
- }
-
- private CubingJob initialJob(CubeSegment seg, String type) {
- CubingJob result = new CubingJob();
- SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
- format.setTimeZone(TimeZone.getTimeZone(engineConfig.getTimeZone()));
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setName(seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + type + " - " + format.format(new Date(System.currentTimeMillis())));
- result.setSubmitter(submitter);
- result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList());
- return result;
- }
-
- private void checkPreconditions(CubeSegment... segments) {
- for (CubeSegment seg : segments) {
- Preconditions.checkNotNull(seg, "segment cannot be null");
- }
- Preconditions.checkNotNull(engineConfig, "jobEngineConfig cannot be null");
- }
-
- private void appendMapReduceParameters(StringBuilder builder, CubeSegment seg) {
- try {
- String jobConf = engineConfig.getHadoopJobConfFilePath(seg.getCubeDesc().getModel().getCapacity());
- if (jobConf != null && jobConf.length() > 0) {
- builder.append(" -conf ").append(jobConf);
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
- String[] paths = new String[groupRowkeyColumnsCount + 1];
- for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
- int dimNum = totalRowkeyColumnCount - i;
- if (dimNum == totalRowkeyColumnCount) {
- paths[i] = cuboidRootPath + "base_cuboid";
- } else {
- paths[i] = cuboidRootPath + dimNum + "d_cuboid";
- }
- }
- return paths;
- }
-
- private String getPathToMerge(CubeSegment seg) {
- return getJobWorkingDir(seg.getLastBuildJobID()) + "/" + seg.getCubeInstance().getName() + "/cuboid/*";
- }
-
- private String getRowkeyDistributionOutputPath(CubeSegment seg, String jobId) {
- return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/rowkey_stats");
- }
-
- private String getFactDistinctColumnsPath(CubeSegment seg, String jobUuid) {
- return getJobWorkingDir(jobUuid) + "/" + seg.getCubeInstance().getName() + "/fact_distinct_columns";
- }
-
- private String getHFilePath(CubeSegment seg, String jobId) {
- return HadoopUtil.makeQualifiedPathInHBaseCluster(getJobWorkingDir(jobId) + "/" + seg.getCubeInstance().getName() + "/hfile/");
- }
-
- private MapReduceExecutable createFactDistinctColumnsStep(CubeSegment seg, String intermediateHiveTableName, String jobId) {
- MapReduceExecutable result = new MapReduceExecutable();
- result.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- result.setMapReduceJobClass(FactDistinctColumnsJob.class);
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "output", getFactDistinctColumnsPath(seg, jobId));
- appendExecCmdParameters(cmd, "jobname", "Kylin_Fact_Distinct_Columns_" + seg.getCubeInstance().getName() + "_Step");
- appendExecCmdParameters(cmd, "tablename", intermediateHiveTableName);
-
- result.setMapReduceParams(cmd.toString());
- return result;
- }
-
- private HadoopShellExecutable createBuildDictionaryStep(CubeSegment seg, String factDistinctColumnsPath) {
- // base cuboid job
- HadoopShellExecutable buildDictionaryStep = new HadoopShellExecutable();
- buildDictionaryStep.setName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", factDistinctColumnsPath);
-
- buildDictionaryStep.setJobParams(cmd.toString());
- buildDictionaryStep.setJobClass(CreateDictionaryJob.class);
- return buildDictionaryStep;
- }
-
- private MapReduceExecutable createBaseCuboidStep(CubeSegment seg, String intermediateHiveTableLocation, String[] cuboidOutputTempPath) {
- // base cuboid job
- MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
-
- StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, seg);
-
- baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
-
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", intermediateHiveTableLocation);
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "level", "0");
-
- baseCuboidStep.setMapReduceParams(cmd.toString());
- baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
- return baseCuboidStep;
- }
-
- private MapReduceExecutable createNDimensionCuboidStep(CubeSegment seg, String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount) {
- // ND cuboid job
- MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
-
- ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
- appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
- appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getCubeInstance().getName() + "_Step");
- appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
-
- ndCuboidStep.setMapReduceParams(cmd.toString());
- ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
- return ndCuboidStep;
- }
-
- private MapReduceExecutable createRangeRowkeyDistributionStep(CubeSegment seg, String inputPath, String jobId) {
- MapReduceExecutable rowkeyDistributionStep = new MapReduceExecutable();
- rowkeyDistributionStep.setName(ExecutableConstants.STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getRowkeyDistributionOutputPath(seg, jobId));
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Region_Splits_Calculator_" + seg.getCubeInstance().getName() + "_Step");
-
- rowkeyDistributionStep.setMapReduceParams(cmd.toString());
- rowkeyDistributionStep.setMapReduceJobClass(RangeKeyDistributionJob.class);
- return rowkeyDistributionStep;
- }
-
- private HadoopShellExecutable createCreateHTableStep(CubeSegment seg, String jobId) {
- HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
- createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "input", getRowkeyDistributionOutputPath(seg, jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
-
- createHtableStep.setJobParams(cmd.toString());
- createHtableStep.setJobClass(CreateHTableJob.class);
-
- return createHtableStep;
- }
-
- private MapReduceExecutable createConvertCuboidToHfileStep(CubeSegment seg, String inputPath, String jobId) {
- MapReduceExecutable createHFilesStep = new MapReduceExecutable();
- createHFilesStep.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", getHFilePath(seg, jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "jobname", "Kylin_HFile_Generator_" + seg.getCubeInstance().getName() + "_Step");
-
- createHFilesStep.setMapReduceParams(cmd.toString());
- createHFilesStep.setMapReduceJobClass(CubeHFileJob.class);
-
- return createHFilesStep;
- }
-
- private HadoopShellExecutable createBulkLoadStep(CubeSegment seg, String jobId) {
- HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
- bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
-
- StringBuilder cmd = new StringBuilder();
- appendExecCmdParameters(cmd, "input", getHFilePath(seg, jobId));
- appendExecCmdParameters(cmd, "htablename", seg.getStorageLocationIdentifier());
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
-
- bulkLoadStep.setJobParams(cmd.toString());
- bulkLoadStep.setJobClass(BulkLoadJob.class);
-
- return bulkLoadStep;
-
- }
-
- private UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(CubeSegment seg, String createFlatTableStepId, String baseCuboidStepId, String convertToHFileStepId, String jobId) {
- final UpdateCubeInfoAfterBuildStep updateCubeInfoStep = new UpdateCubeInfoAfterBuildStep();
- updateCubeInfoStep.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- updateCubeInfoStep.setCubeName(seg.getCubeInstance().getName());
- updateCubeInfoStep.setSegmentId(seg.getUuid());
- updateCubeInfoStep.setCreateFlatTableStepId(createFlatTableStepId);
- updateCubeInfoStep.setBaseCuboidStepId(baseCuboidStepId);
- updateCubeInfoStep.setConvertToHFileStepId(convertToHFileStepId);
- updateCubeInfoStep.setCubingJobId(jobId);
- return updateCubeInfoStep;
- }
-
- private MergeDictionaryStep createMergeDictionaryStep(CubeSegment seg, List<String> mergingSegmentIds) {
- MergeDictionaryStep result = new MergeDictionaryStep();
- result.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- return result;
- }
-
- private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
- MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
- mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
- StringBuilder cmd = new StringBuilder();
-
- appendMapReduceParameters(cmd, seg);
- appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
- appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "input", inputPath);
- appendExecCmdParameters(cmd, "output", outputPath);
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
-
- mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
- return mergeCuboidDataStep;
- }
-
- private UpdateCubeInfoAfterMergeStep createUpdateCubeInfoAfterMergeStep(CubeSegment seg, List<String> mergingSegmentIds, String convertToHFileStepId, String jobId) {
- UpdateCubeInfoAfterMergeStep result = new UpdateCubeInfoAfterMergeStep();
- result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
- result.setCubeName(seg.getCubeInstance().getName());
- result.setSegmentId(seg.getUuid());
- result.setMergingSegmentIds(mergingSegmentIds);
- result.setConvertToHFileStepId(convertToHFileStepId);
- result.setCubingJobId(jobId);
- return result;
- }
-
- private GarbageCollectionStep createGarbageCollectionStep(CubeSegment seg, List<String> oldHtables, String hiveIntermediateTable, List<String> oldHdsfPaths) {
- GarbageCollectionStep result = new GarbageCollectionStep();
- result.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION);
- result.setOldHTables(oldHtables);
- result.setOldHiveTable(hiveIntermediateTable);
- result.setOldHdfsPaths(oldHdsfPaths);
- return result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kylin/blob/6b6aa313/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java b/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
deleted file mode 100644
index f2f1fc0..0000000
--- a/job/src/main/java/org/apache/kylin/job/cube/GarbageCollectionStep.java
+++ /dev/null
@@ -1,210 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.job.cube;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.job.cmd.ShellCmdOutput;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.metadata.realization.IRealizationConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- * Drop the resources that is no longer needed, including intermediate hive table (after cube build) and hbase tables (after cube merge)
- */
-public class GarbageCollectionStep extends AbstractExecutable {
-
- private static final String OLD_HTABLES = "oldHTables";
-
- private static final String OLD_HIVE_TABLE = "oldHiveTable";
-
- private static final String OLD_HDFS_PATHS = "oldHdfsPaths";
-
- private static final Logger logger = LoggerFactory.getLogger(GarbageCollectionStep.class);
-
- private StringBuffer output;
-
- public GarbageCollectionStep() {
- super();
- output = new StringBuffer();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
-
- try {
- dropHBaseTable(context);
- dropHiveTable(context);
- dropHdfsPath(context);
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- output.append("\n").append(e.getLocalizedMessage());
- return new ExecuteResult(ExecuteResult.State.ERROR, output.toString());
- }
-
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- }
-
- private void dropHiveTable(ExecutableContext context) throws IOException {
- final String hiveTable = this.getOldHiveTable();
- if (StringUtils.isNotEmpty(hiveTable)) {
- final String dropSQL = "USE " + KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable() + ";" + " DROP TABLE IF EXISTS " + hiveTable + ";";
- final String dropHiveCMD = "hive -e \"" + dropSQL + "\"";
- logger.info("executing: " + dropHiveCMD);
- ShellCmdOutput shellCmdOutput = new ShellCmdOutput();
- context.getConfig().getCliCommandExecutor().execute(dropHiveCMD, shellCmdOutput);
- logger.debug("Dropped Hive table " + hiveTable + " \n");
- output.append(shellCmdOutput.getOutput() + " \n");
- output.append("Dropped Hive table " + hiveTable + " \n");
- }
-
- }
-
- private void dropHBaseTable(ExecutableContext context) throws IOException {
- List<String> oldTables = getOldHTables();
- if (oldTables != null && oldTables.size() > 0) {
- String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix();
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin admin = null;
- try {
- admin = new HBaseAdmin(conf);
- for (String table : oldTables) {
- if (admin.tableExists(table)) {
- HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table));
- String host = tableDescriptor.getValue(IRealizationConstants.HTableTag);
- if (metadataUrlPrefix.equalsIgnoreCase(host)) {
- if (admin.isTableEnabled(table)) {
- admin.disableTable(table);
- }
- admin.deleteTable(table);
- logger.debug("Dropped HBase table " + table);
- output.append("Dropped HBase table " + table + " \n");
- } else {
- logger.debug("Skipped HBase table " + table);
- output.append("Skipped HBase table " + table + " \n");
- }
- }
- }
-
- } finally {
- if (admin != null)
- try {
- admin.close();
- } catch (IOException e) {
- logger.error(e.getLocalizedMessage());
- }
- }
- }
- }
-
- private void dropHdfsPathOnCluster(List<String> oldHdfsPaths, FileSystem fileSystem) throws IOException {
- if (oldHdfsPaths != null && oldHdfsPaths.size() > 0) {
- logger.debug("Drop HDFS path on FileSystem: " + fileSystem.getUri());
- output.append("Drop HDFS path on FileSystem: \"" + fileSystem.getUri() + "\" \n");
- for (String path : oldHdfsPaths) {
- if (path.endsWith("*"))
- path = path.substring(0, path.length() - 1);
-
- Path oldPath = new Path(path);
- if (fileSystem.exists(oldPath)) {
- fileSystem.delete(oldPath, true);
- logger.debug("Dropped HDFS path: " + path);
- output.append("Dropped HDFS path \"" + path + "\" \n");
- } else {
- logger.debug("HDFS path not exists: " + path);
- output.append("HDFS path not exists: \"" + path + "\" \n");
- }
- }
- }
- }
-
- private void dropHdfsPath(ExecutableContext context) throws IOException {
- List<String> oldHdfsPaths = this.getOldHdfsPaths();
- FileSystem fileSystem = FileSystem.get(HadoopUtil.getCurrentConfiguration());
- dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
-
- if (StringUtils.isNotEmpty(context.getConfig().getHBaseClusterFs())) {
- fileSystem = FileSystem.get(HadoopUtil.getCurrentHBaseConfiguration());
- dropHdfsPathOnCluster(oldHdfsPaths, fileSystem);
- }
-
- }
-
- public void setOldHTables(List<String> tables) {
- setArrayParam(OLD_HTABLES, tables);
- }
-
- private List<String> getOldHTables() {
- return getArrayParam(OLD_HTABLES);
- }
-
- public void setOldHdfsPaths(List<String> paths) {
- setArrayParam(OLD_HDFS_PATHS, paths);
- }
-
- private List<String> getOldHdfsPaths() {
- return getArrayParam(OLD_HDFS_PATHS);
- }
-
- private void setArrayParam(String paramKey, List<String> paramValues) {
- setParam(paramKey, StringUtils.join(paramValues, ","));
- }
-
- private List<String> getArrayParam(String paramKey) {
- final String ids = getParam(paramKey);
- if (ids != null) {
- final String[] splitted = StringUtils.split(ids, ",");
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(splitted.length);
- for (String id : splitted) {
- result.add(id);
- }
- return result;
- } else {
- return Collections.emptyList();
- }
- }
-
- public void setOldHiveTable(String hiveTable) {
- setParam(OLD_HIVE_TABLE, hiveTable);
- }
-
- private String getOldHiveTable() {
- return getParam(OLD_HIVE_TABLE);
- }
-
-}