You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:22:42 UTC
[22/97] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/JobManager.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/JobManager.java b/job/src/main/java/com/kylinolap/job/JobManager.java
deleted file mode 100644
index 91d3d43..0000000
--- a/job/src/main/java/com/kylinolap/job/JobManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job;
-
-import java.io.IOException;
-import java.net.UnknownHostException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.exception.CubeIntegrityException;
-import com.kylinolap.cube.project.ProjectInstance;
-import com.kylinolap.cube.project.ProjectManager;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngine;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.InvalidJobInstanceException;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-
-/**
- * @author xjiang, ysong1
- *
- */
-public class JobManager {
-
- private static Logger log = LoggerFactory.getLogger(JobManager.class);
-
- private final KylinConfig config;
- private final JobEngineConfig engineConfig;
- private final JobEngine jobEngine;
- private final JobDAO jobDAO;
-
- public JobManager(String engineID, JobEngineConfig engineCfg) throws JobException, UnknownHostException {
- this.engineConfig = engineCfg;
- this.config = engineConfig.getConfig();
- this.jobDAO = JobDAO.getInstance(config);
-
- // InetAddress ia = InetAddress.getLocalHost();
- // this.jobEngine =
- // Constant.getInstanceFromEnv(ia.getCanonicalHostName(), cfg);
- this.jobEngine = JobEngine.getInstance(engineID, engineCfg);
- }
-
- public JobInstance createJob(String cubeName, String segmentName, String segmentId, CubeBuildTypeEnum jobType, String submitter) throws IOException {
- // build job instance
- JobInstance jobInstance = buildJobInstance(cubeName, segmentName, segmentId, jobType, submitter);
-
- // create job steps based on job type
- JobInstanceBuilder stepBuilder = new JobInstanceBuilder(this.engineConfig);
- jobInstance.addSteps(stepBuilder.buildSteps(jobInstance));
-
- return jobInstance;
- }
-
- private JobInstance buildJobInstance(String cubeName, String segmentName, String segmentId, CubeBuildTypeEnum jobType, String submitter) {
- SimpleDateFormat format = new SimpleDateFormat("z yyyy-MM-dd HH:mm:ss");
- format.setTimeZone(TimeZone.getTimeZone(this.engineConfig.getTimeZone()));
-
- JobInstance jobInstance = new JobInstance();
- jobInstance.setUuid(segmentId);
- jobInstance.setType(jobType);
- jobInstance.setRelatedCube(cubeName);
- jobInstance.setRelatedSegment(segmentName);
- jobInstance.setName(cubeName + " - " + segmentName + " - " + jobType.toString() + " - " + format.format(new Date(System.currentTimeMillis())));
- jobInstance.setSubmitter(submitter);
- return jobInstance;
- }
-
- public String submitJob(JobInstance job) throws IOException, InvalidJobInstanceException {
- if (hasDuplication(job) == false) {
- // submitted job status should always be PENDING
- // job.setStatus(JobStatusEnum.PENDING);
- jobDAO.updateJobInstance(job);
- return job.getUuid();
- } else {
- throw new InvalidJobInstanceException("Job " + job.getName() + " is duplicated!");
- }
- }
-
- public void resumeJob(String uuid) throws IOException, JobException {
- JobInstance jobInstance = jobDAO.getJob(uuid);
- log.info("Resuming job " + uuid);
- if (jobInstance.getStatus() != JobStatusEnum.ERROR) {
- throw new RuntimeException("Can't resume job with status " + jobInstance.getStatus().toString());
- }
-
- for (JobStep jobStep : jobInstance.getSteps()) {
- if (jobStep.getStatus() == JobStepStatusEnum.ERROR) {
- jobStep.setStatus(JobStepStatusEnum.PENDING);
- // jobStep.setCmdOutput("");
- jobStep.clearInfo();
- jobDAO.saveJobOutput(jobStep, "");
- }
- }
- jobDAO.updateJobInstance(jobInstance);
- }
-
- private boolean hasDuplication(JobInstance newJob) throws IOException {
- List<JobInstance> allJobs = listJobs(null, null);
- for (JobInstance job : allJobs) {
- if (job.getRelatedCube().equals(newJob.getRelatedCube()) && job.getRelatedSegment().equals(newJob.getRelatedSegment()) && job.getType().equals(newJob.getType()) && job.getStatus().equals(newJob.getStatus())) {
- return true;
- }
- }
- return false;
- }
-
- public void discardJob(String uuid) throws IOException, CubeIntegrityException, JobException {
- // check job status
- JobInstance jobInstance = jobDAO.getJob(uuid);
- CubeInstance cube = CubeManager.getInstance(config).getCube(jobInstance.getRelatedCube());
-
- switch (jobInstance.getStatus()) {
- case RUNNING:
- try {
- killRunningJob(jobInstance);
- } finally {
- CubeManager.getInstance(config).updateSegmentOnJobDiscard(cube, jobInstance.getRelatedSegment());
- }
- break;
- case PENDING:
- try {
- killRunningJob(jobInstance);
- } finally {
- CubeManager.getInstance(config).updateSegmentOnJobDiscard(cube, jobInstance.getRelatedSegment());
- }
- break;
- case ERROR:
- try {
- for (JobStep jobStep : jobInstance.getSteps()) {
- if (jobStep.getStatus() != JobStepStatusEnum.FINISHED) {
- jobStep.setStatus(JobStepStatusEnum.DISCARDED);
- }
- }
- jobDAO.updateJobInstance(jobInstance);
- } finally {
- CubeManager.getInstance(config).updateSegmentOnJobDiscard(cube, jobInstance.getRelatedSegment());
- }
- break;
- default:
- throw new IllegalStateException("Invalid status to discard : " + jobInstance.getStatus());
- }
- }
-
- /**
- * @param uuid
- * @param jobInstance
- * @throws IOException
- * @throws JobException
- */
- private void killRunningJob(JobInstance jobInstance) throws IOException, JobException {
- // find the running step
- JobStep runningStep = jobInstance.getRunningStep();
- if (runningStep == null) {
- throw new IllegalStateException("There is no running step in job " + jobInstance.getUuid());
- }
-
- // update job to DISCARDED
- runningStep.setStatus(JobStepStatusEnum.DISCARDED);
- runningStep.setExecEndTime(System.currentTimeMillis());
- jobDAO.updateJobInstance(jobInstance);
-
- // cancel job in engine
- this.jobEngine.interruptJob(jobInstance, runningStep);
- }
-
- public List<JobInstance> listJobs(String cubeName, String projectName) throws IOException {
- List<JobInstance> jobs = jobDAO.listAllJobs(cubeName);
-
- if (null == projectName || null == ProjectManager.getInstance(config).getProject(projectName)) {
- return jobs;
- } else {
- List<JobInstance> filtedJobs = new ArrayList<JobInstance>();
- ProjectInstance project = ProjectManager.getInstance(config).getProject(projectName);
- for (JobInstance job : jobs) {
- if (project.getCubes().contains(job.getRelatedCube().toUpperCase())) {
- filtedJobs.add(job);
- }
- }
-
- return filtedJobs;
- }
- }
-
- public JobInstance getJob(String uuid) throws IOException {
- return jobDAO.getJob(uuid);
- }
-
- public String getJobStepOutput(String jobUuid, int stepSequenceId) throws IOException {
- JobStepOutput output = jobDAO.getJobOutput(jobUuid, stepSequenceId);
- if (null == output) {
- return "";
- } else {
- return output.getOutput();
- }
- }
-
- public void deleteJob(String uuid) throws IOException {
- jobDAO.deleteJob(uuid);
- }
-
- public void deleteAllJobs() throws IOException {
- List<JobInstance> allJobs = listJobs(null, null);
- for (JobInstance job : allJobs) {
- jobDAO.deleteJob(job);
- }
- }
-
- public String previewFlatHiveQL(String cubeName, String segmentName) {
- CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName);
- CubeDesc cubeDesc = cube.getDescriptor();
- CubeSegment cubeSegment = cube.getSegment(segmentName, CubeSegmentStatusEnum.READY);
- JoinedFlatTableDesc flatTableDesc = new JoinedFlatTableDesc(cubeDesc, cubeSegment);
- return JoinedFlatTable.generateSelectDataStatement(flatTableDesc);
- }
-
- public void startJobEngine() throws Exception {
- startJobEngine(JobConstants.DEFAULT_SCHEDULER_INTERVAL_SECONDS);
- }
-
- public void startJobEngine(int daemonJobIntervalInSeconds) throws Exception {
- jobDAO.updateRunningJobToError();
- jobEngine.start(daemonJobIntervalInSeconds);
- }
-
- public void stopJobEngine() throws JobException {
- jobEngine.stop();
- }
-
- // Job engine metrics related methods
-
- public int getNumberOfJobStepsExecuted() {
- return jobEngine.getNumberOfJobStepsExecuted();
- }
-
- public String getPrimaryEngineID() throws Exception {
- return jobEngine.getPrimaryEngineID();
- }
-
- public double getMinJobStepDuration() {
- return jobEngine.getMinJobStepDuration();
- }
-
- public double getMaxJobStepDuration() {
- return jobEngine.getMaxJobStepDuration();
- }
-
- /**
- * @param percentile
- * (eg. 95 percentile)
- * @return the percentile value
- */
- public double getPercentileJobStepDuration(double percentile) {
- return jobEngine.getPercentileJobStepDuration(percentile);
- }
-
- /**
- * @return
- */
- public Integer getScheduledJobsSzie() {
- return jobEngine.getScheduledJobsSzie();
- }
-
- /**
- * @return
- */
- public int getEngineThreadPoolSize() {
- return jobEngine.getEngineThreadPoolSize();
- }
-
- /**
- * @return
- */
- public int getNumberOfIdleSlots() {
- return jobEngine.getNumberOfIdleSlots();
- }
-
- /**
- * @return
- */
- public int getNumberOfJobStepsRunning() {
- return jobEngine.getNumberOfJobStepsRunning();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/JobStepOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/JobStepOutput.java b/job/src/main/java/com/kylinolap/job/JobStepOutput.java
deleted file mode 100644
index cfa3e0c..0000000
--- a/job/src/main/java/com/kylinolap/job/JobStepOutput.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.kylinolap.common.persistence.RootPersistentEntity;
-
-/**
- * @author ysong1
- *
- */
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JobStepOutput extends RootPersistentEntity {
-
- public static String nameOfOutput(String jobUuid, int stepSequenceId) {
- return jobUuid + "." + stepSequenceId;
- }
-
- @JsonProperty("name")
- private String name; // job uuid + "." +step sequence id
- @JsonProperty("output")
- private String output = "";
-
- /**
- * @return the name
- */
- public String getName() {
- return name;
- }
-
- /**
- * @return the output
- */
- public String getOutput() {
- return output;
- }
-
- /**
- * @param output
- * the output to set
- */
- public void setOutput(String output) {
- this.output = output;
- }
-
- /**
- * @param name
- * the name to set
- */
- public void setName(String name) {
- this.name = name;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java b/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java
deleted file mode 100644
index c5f93b4..0000000
--- a/job/src/main/java/com/kylinolap/job/JoinedFlatTable.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.TimeZone;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-import com.kylinolap.common.util.StringUtil;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc;
-import com.kylinolap.job.hadoop.hive.JoinedFlatTableDesc.IntermediateColumnDesc;
-import com.kylinolap.job.hadoop.hive.SqlHiveDataTypeMapping;
-import com.kylinolap.metadata.model.cube.CubeDesc;
-import com.kylinolap.metadata.model.cube.DimensionDesc;
-import com.kylinolap.metadata.model.cube.JoinDesc;
-import com.kylinolap.metadata.model.cube.TblColRef;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class JoinedFlatTable {
-
- public static String getTableDir(JoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
- return storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID);
- }
-
- public static String generateCreateTableStatement(JoinedFlatTableDesc intermediateTableDesc, String storageDfsDir, String jobUUID) {
- StringBuilder ddl = new StringBuilder();
-
- ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName(jobUUID) + "\n");
-
- ddl.append("(" + "\n");
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
- if (i > 0) {
- ddl.append(",");
- }
- ddl.append(col.getColumnName() + " " + SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n");
- }
- ddl.append(")" + "\n");
-
- ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
- ddl.append("STORED AS SEQUENCEFILE" + "\n");
- ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName(jobUUID) + "'" + ";");
- // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
- // ";\n");
- return ddl.toString();
- }
-
- public static String generateDropTableStatement(JoinedFlatTableDesc intermediateTableDesc, String jobUUID) {
- StringBuilder ddl = new StringBuilder();
- ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName(jobUUID) + ";");
- return ddl.toString();
- }
-
- public static String generateInsertDataStatement(JoinedFlatTableDesc intermediateTableDesc, String jobUUID, JobEngineConfig engineConfig) throws IOException {
- StringBuilder sql = new StringBuilder();
-
- File hadoopPropertiesFile = new File(engineConfig.getHadoopJobConfFilePath(intermediateTableDesc.getCubeDesc().getCapacity()));
-
- if (hadoopPropertiesFile.exists()) {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- DocumentBuilder builder;
- Document doc;
- try {
- builder = factory.newDocumentBuilder();
- doc = builder.parse(hadoopPropertiesFile);
- NodeList nl = doc.getElementsByTagName("property");
- for (int i = 0; i < nl.getLength(); i++) {
- String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
- String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
- if (name.equals("tmpjars") == false) {
- sql.append("SET " + name + "=" + value + ";\n");
- }
- }
-
- } catch (ParserConfigurationException e) {
- throw new IOException(e);
- } catch (SAXException e) {
- throw new IOException(e);
- }
- }
-
- // hard coded below mr parameters to enable map-side join
- sql.append("SET hive.exec.compress.output=true;" + "\n");
- sql.append("SET hive.auto.convert.join.noconditionaltask = true;" + "\n");
- sql.append("SET hive.auto.convert.join.noconditionaltask.size = 300000000;" + "\n");
- sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName(jobUUID) + "\n");
-
- sql.append(generateSelectDataStatement(intermediateTableDesc));
- sql.append(";");
- return sql.toString();
- }
-
- public static String generateSelectDataStatement(JoinedFlatTableDesc intermediateTableDesc) {
- StringBuilder sql = new StringBuilder();
- sql.append("SELECT" + "\n");
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
- if (i > 0) {
- sql.append(",");
- }
- sql.append(col.getTableName() + "." + col.getColumnName() + "\n");
- }
- appendJoinStatement(intermediateTableDesc, sql);
- appendWhereStatement(intermediateTableDesc, sql);
- return sql.toString();
- }
-
- private static void appendJoinStatement(JoinedFlatTableDesc intermediateTableDesc, StringBuilder sql) {
- Set<String> dimTableCache = new HashSet<String>();
-
- CubeDesc cubeDesc = intermediateTableDesc.getCubeDesc();
- String factTableName = cubeDesc.getFactTable();
- sql.append("FROM " + factTableName + "\n");
-
- for (DimensionDesc dim : cubeDesc.getDimensions()) {
- JoinDesc join = dim.getJoin();
- if (join != null && join.getType().equals("") == false) {
- String joinType = join.getType().toUpperCase();
- String dimTableName = dim.getTable();
- if (!dimTableCache.contains(dimTableName)) {
- TblColRef[] pk = join.getPrimaryKeyColumns();
- TblColRef[] fk = join.getForeignKeyColumns();
- if (pk.length != fk.length) {
- throw new RuntimeException("Invalid join condition of dimension " + dim.getName());
- }
- sql.append(joinType + " JOIN " + dimTableName + "\n");
- sql.append("ON ");
- for (int i = 0; i < pk.length; i++) {
- if (i > 0) {
- sql.append(" AND ");
- }
- sql.append(factTableName + "." + fk[i].getName() + " = " + dimTableName + "." + pk[i].getName());
- }
- sql.append("\n");
-
- dimTableCache.add(dimTableName);
- }
- }
- }
- }
-
- private static void appendWhereStatement(JoinedFlatTableDesc intermediateTableDesc, StringBuilder sql) {
- boolean hasCondition = false;
- StringBuilder whereBuilder = new StringBuilder();
- whereBuilder.append("WHERE");
-
- CubeDesc cubeDesc = intermediateTableDesc.getCubeDesc();
-
- if (cubeDesc.getFilterCondition() != null && cubeDesc.getFilterCondition().equals("") == false) {
- whereBuilder.append(" (").append(cubeDesc.getFilterCondition()).append(") ");
- hasCondition = true;
- }
-
- CubeSegment cubeSegment = intermediateTableDesc.getCubeSegment();
-
- if (null != cubeSegment) {
- long dateStart = cubeSegment.getDateRangeStart();
- long dateEnd = cubeSegment.getDateRangeEnd();
-
- if (cubeSegment.getCubeInstance().needMergeImmediatelyAfterBuild(cubeSegment)) {
- dateStart = cubeSegment.getCubeInstance().getDateRange()[1];
- }
- if (!(dateStart == 0 && dateEnd == 0)) {
- String partitionColumnName = cubeDesc.getCubePartitionDesc().getPartitionDateColumn();
-
- whereBuilder.append(hasCondition ? " AND (" : " (");
- if (dateStart > 0) {
- whereBuilder.append(partitionColumnName + " >= '" + formatDateTimeInWhereClause(dateStart) + "' ");
- whereBuilder.append("AND ");
- }
- whereBuilder.append(partitionColumnName + " < '" + formatDateTimeInWhereClause(dateEnd) + "'");
- whereBuilder.append(")\n");
- hasCondition = true;
- }
- }
-
- if (hasCondition) {
- sql.append(whereBuilder.toString());
- }
- }
-
- private static String formatDateTimeInWhereClause(long datetime) {
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- Date date = new Date(datetime);
- String str = f.format(date);
- // note "2014-10-01" >= "2014-10-01 00:00:00" is FALSE
- return StringUtil.dropSuffix(str, " 00:00:00");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/ICommandOutput.java b/job/src/main/java/com/kylinolap/job/cmd/ICommandOutput.java
deleted file mode 100644
index ca8cc08..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import com.kylinolap.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public interface ICommandOutput {
-
- public void setStatus(JobStepStatusEnum status);
-
- public JobStepStatusEnum getStatus();
-
- public void appendOutput(String message);
-
- public String getOutput();
-
- public void setExitCode(int exitCode);
-
- public int getExitCode();
-
- public void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/IJobCommand.java b/job/src/main/java/com/kylinolap/job/cmd/IJobCommand.java
deleted file mode 100644
index eceada9..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import com.kylinolap.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- */
-public interface IJobCommand {
-
- public ICommandOutput execute() throws JobException;
-
- public void cancel() throws JobException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmd.java b/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmd.java
deleted file mode 100644
index 12b4d2c..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmd.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.JobException;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-
-/**
- * @author xduo
- *
- */
-public class JavaHadoopCmd implements IJobCommand {
- protected static final Logger log = LoggerFactory.getLogger(JavaHadoopCmd.class);
-
- private final String executeCommand;
- private final ICommandOutput output;
- private final AbstractHadoopJob job;
-
- public JavaHadoopCmd(String executeCommand, String jobInstanceID, int jobStepID, JobEngineConfig engineConfig, AbstractHadoopJob job, boolean isAsync) {
- super();
- this.executeCommand = executeCommand;
- this.job = job;
- this.output = new JavaHadoopCmdOutput(jobInstanceID, jobStepID, engineConfig, job, isAsync);
- }
-
- @Override
- public ICommandOutput execute() throws JobException {
- output.appendOutput("Start to execute command: \n" + this.executeCommand);
- String[] args = executeCommand.trim().split("\\s+");
-
- try {
- output.setStatus(JobStepStatusEnum.RUNNING);
- int exitCode = ToolRunner.run(job, args);
- output.setExitCode(exitCode);
- } catch (Exception e) {
- output.appendOutput(e.getLocalizedMessage());
- output.setExitCode(-1);
- }
-
- output.appendOutput("Command execute return code " + output.getExitCode());
-
- if (output.getExitCode() != 0) {
- output.setStatus(JobStepStatusEnum.ERROR);
- }
-
- return output;
- }
-
- @Override
- public void cancel() throws JobException {
- job.kill();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmdOutput.java b/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmdOutput.java
deleted file mode 100644
index 20dd9dd..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/JavaHadoopCmdOutput.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import java.util.Map;
-
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.tools.HadoopStatusChecker;
-
-/**
- * @author xduo
- *
- */
-public class JavaHadoopCmdOutput implements ICommandOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(JavaHadoopCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
- private final KylinConfig config;
- private final String jobInstanceID;
- private final int jobStepID;
- private final String yarnUrl;
- private final AbstractHadoopJob job;
- private String mrJobID = null;
- private String trackUrl = null;
- private boolean isAsync;
-
- public JavaHadoopCmdOutput(String jobInstanceID, int jobStepID, JobEngineConfig engineConfig, AbstractHadoopJob job, boolean isAsync) {
- super();
- this.config = engineConfig.getConfig();
- this.yarnUrl = engineConfig.getYarnStatusServiceUrl();
- this.jobInstanceID = jobInstanceID;
- this.jobStepID = jobStepID;
- this.job = job;
- this.isAsync = isAsync;
-
- init();
- }
-
- @Override
- public void setStatus(JobStepStatusEnum status) {
- this.status = status;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- if (this.isAsync) {
- if (this.status == JobStepStatusEnum.ERROR) {
- return status;
- }
-
- if (null == this.mrJobID || null == this.trackUrl) {
- updateHadoopJobInfo();
- }
-
- status = new HadoopStatusChecker(this.yarnUrl, this.mrJobID, output).checkStatus();
-
- if (this.status.isComplete()) {
- updateJobCounter();
- }
- } else {
- status = (this.exitCode == 0) ? JobStepStatusEnum.FINISHED : JobStepStatusEnum.ERROR;
- }
-
- return status;
- }
-
- @Override
- public void appendOutput(String message) {
- log.debug(message);
- output.append(message).append("\n");
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void setExitCode(int exitCode) {
- this.exitCode = exitCode;
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void reset() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- /**
- * @param jobStatus
- */
- private void updateHadoopJobInfo() {
- try {
- Map<String, String> jobInfo = job.getInfo();
-
- JobDAO jobDAO = JobDAO.getInstance(config);
- JobInstance jobInstance = jobDAO.getJob(jobInstanceID);
- JobStep jobStep = jobInstance.getSteps().get(jobStepID);
- boolean hasChange = false;
-
- if (null == this.mrJobID && jobInfo.containsKey(JobInstance.MR_JOB_ID)) {
- this.mrJobID = jobInfo.get(JobInstance.MR_JOB_ID);
- jobStep.putInfo(JobInstance.MR_JOB_ID, this.mrJobID);
- output.append("Get job id " + this.mrJobID).append("\n");
- hasChange = true;
- }
-
- if (null == this.trackUrl && jobInfo.containsKey(JobInstance.YARN_APP_URL)) {
- this.trackUrl = jobInfo.get(JobInstance.YARN_APP_URL);
- jobStep.putInfo(JobInstance.YARN_APP_URL, this.trackUrl);
- output.append("Get job track url " + this.trackUrl).append("\n");
- hasChange = true;
- }
- if (hasChange) {
- jobDAO.updateJobInstance(jobInstance);
- }
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- output.append(e.getLocalizedMessage());
- }
- }
-
- private void updateJobCounter() {
- try {
- this.output.append(job.getCounters().toString()).append("\n");
- log.debug(job.getCounters().toString());
-
- JobDAO jobDAO = JobDAO.getInstance(config);
- JobInstance jobInstance = jobDAO.getJob(jobInstanceID);
- JobStep jobStep = jobInstance.getSteps().get(jobStepID);
-
- long mapInputRecords = job.getCounters().findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue();
- jobStep.putInfo(JobInstance.SOURCE_RECORDS_COUNT, String.valueOf(mapInputRecords));
- long hdfsBytesWritten = job.getCounters().findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue();
- jobStep.putInfo(JobInstance.HDFS_BYTES_WRITTEN, String.valueOf(hdfsBytesWritten));
-
- jobDAO.updateJobInstance(jobInstance);
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- output.append(e.getLocalizedMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/JobCommandFactory.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/JobCommandFactory.java b/job/src/main/java/com/kylinolap/job/cmd/JobCommandFactory.java
deleted file mode 100644
index 39bda56..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/JobCommandFactory.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobStepCmdTypeEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import com.kylinolap.job.hadoop.cube.BaseCuboidMapper;
-import com.kylinolap.job.hadoop.cube.CubeHFileJob;
-import com.kylinolap.job.hadoop.cube.CuboidJob;
-import com.kylinolap.job.hadoop.cube.FactDistinctColumnsJob;
-import com.kylinolap.job.hadoop.cube.MergeCuboidJob;
-import com.kylinolap.job.hadoop.cube.NDCuboidMapper;
-import com.kylinolap.job.hadoop.cube.RangeKeyDistributionJob;
-import com.kylinolap.job.hadoop.dict.CreateDictionaryJob;
-import com.kylinolap.job.hadoop.hbase.BulkLoadJob;
-import com.kylinolap.job.hadoop.hbase.CreateHTableJob;
-
-/**
- * @author xjiang
- *
- */
-public class JobCommandFactory {
-
- public static IJobCommand getJobCommand(String command, JobInstance jobInstance, int jobStepID, JobEngineConfig engineConfig) {
- String instanceID = jobInstance.getUuid();
-
- boolean isRemote = engineConfig.isRunAsRemoteCommand();
- String hostname = isRemote ? engineConfig.getRemoteHadoopCliHostname() : null;
- String username = isRemote ? engineConfig.getRemoteHadoopCliUsername() : null;
- String password = isRemote ? engineConfig.getRemoteHadoopCliPassword() : null;
- JobStep jobStep = jobInstance.getSteps().get(jobStepID);
- boolean isAsync = jobStep.isRunAsync();
- JobStepCmdTypeEnum type = jobStep.getCmdType();
-
- switch (type) {
- case SHELL_CMD_HADOOP:
- return new ShellHadoopCmd(command, hostname, username, password, isAsync, instanceID, jobStepID, engineConfig);
- case JAVA_CMD_HADOOP_FACTDISTINCT:
- FactDistinctColumnsJob factDistinctJob = new FactDistinctColumnsJob();
- factDistinctJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, factDistinctJob, isAsync);
- case JAVA_CMD_HADOOP_BASECUBOID:
- CuboidJob baseCuboidJob = new CuboidJob();
- baseCuboidJob.setAsync(isAsync);
- baseCuboidJob.setMapperClass(BaseCuboidMapper.class);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, baseCuboidJob, isAsync);
- case JAVA_CMD_HADOOP_NDCUBOID:
- CuboidJob ndCuboidJob = new CuboidJob();
- ndCuboidJob.setAsync(isAsync);
- ndCuboidJob.setMapperClass(NDCuboidMapper.class);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, ndCuboidJob, isAsync);
- case JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION:
- AbstractHadoopJob rangeKeyDistributionJob = new RangeKeyDistributionJob();
- rangeKeyDistributionJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, rangeKeyDistributionJob, isAsync);
- case JAVA_CMD_HADOOP_CONVERTHFILE:
- CubeHFileJob cubeHFileJob = new CubeHFileJob();
- cubeHFileJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, cubeHFileJob, isAsync);
- case JAVA_CMD_HADOOP_MERGECUBOID:
- MergeCuboidJob mergeCuboidJob = new MergeCuboidJob();
- mergeCuboidJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, mergeCuboidJob, isAsync);
- case JAVA_CMD_HADOOP_NO_MR_DICTIONARY:
- CreateDictionaryJob createDictionaryJob = new CreateDictionaryJob();
- createDictionaryJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, createDictionaryJob, isAsync);
- case JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE:
- CreateHTableJob createHTableJob = new CreateHTableJob();
- createHTableJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, createHTableJob, isAsync);
- case JAVA_CMD_HADOOP_NO_MR_BULKLOAD:
- BulkLoadJob bulkLoadJob = new BulkLoadJob();
- bulkLoadJob.setAsync(isAsync);
- return new JavaHadoopCmd(command, instanceID, jobStepID, engineConfig, bulkLoadJob, isAsync);
- default:
- return new ShellCmd(command, hostname, username, password, isAsync);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java b/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java
deleted file mode 100644
index 6ecb85c..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.PipedInputStream;
-import java.io.PipedOutputStream;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CancellationException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.FutureTask;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.jcraft.jsch.Channel;
-import com.jcraft.jsch.ChannelExec;
-import com.jcraft.jsch.JSch;
-import com.jcraft.jsch.Session;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- * FIXME should reuse common.util.SSHClient
- */
-public class ShellCmd implements IJobCommand {
-
- private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
- private final String executeCommand;
- private final ICommandOutput output;
- private final String remoteHost;
- private final String remoteUser;
- private final String remotePassword;
- private final String identityPath;
- private final boolean isAsync;
-
- private FutureTask<Integer> future;
-
- protected ShellCmd(String executeCmd, ICommandOutput out, String host, String user, String password, boolean async) {
- this.executeCommand = executeCmd;
- this.output = out;
- this.remoteHost = host;
- this.remoteUser = user;
- if (password != null && new File(password).exists()) {
- this.identityPath = new File(password).getAbsolutePath();
- this.remotePassword = null;
- } else {
- this.remotePassword = password;
- this.identityPath = null;
- }
- this.isAsync = async;
- }
-
- public ShellCmd(String executeCmd, String host, String user, String password, boolean async) {
- this(executeCmd, new ShellCmdOutput(), host, user, password, async);
- }
-
- @Override
- public ICommandOutput execute() throws JobException {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- future = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() throws JobException {
- executor.shutdown();
- return executeCommand(executeCommand);
- }
- });
- executor.execute(future);
-
- int exitCode = -1;
- if (!isAsync) {
- try {
- exitCode = future.get();
- log.info("finish executing");
- } catch (CancellationException e) {
- log.debug("Command is cancelled");
- exitCode = -2;
- } catch (Exception e) {
- throw new JobException("Error when exectute job " + executeCommand, e);
- } finally {
- if (exitCode == 0) {
- output.setStatus(JobStepStatusEnum.FINISHED);
- } else if (exitCode == -2) {
- output.setStatus(JobStepStatusEnum.DISCARDED);
- } else {
- output.setStatus(JobStepStatusEnum.ERROR);
- }
- output.setExitCode(exitCode);
- }
- }
- return output;
- }
-
- protected int executeCommand(String command) throws JobException {
- output.reset();
- if (remoteHost != null) {
- log.debug("Executing remote cmd: " + command);
- return remoteExec(command);
- } else {
- log.debug("Executing local cmd: " + command);
- return localExec(command);
- }
- }
-
- private int localExec(String command) throws JobException {
- output.setStatus(JobStepStatusEnum.RUNNING);
- String[] cmd = new String[3];
- cmd[0] = "/bin/bash";
- cmd[1] = "-c";
- cmd[2] = command;
-
- BufferedReader reader = null;
- int exitCode = -1;
- try {
- ProcessBuilder builder = new ProcessBuilder(cmd);
- builder.redirectErrorStream(true);
- Process proc = builder.start();
-
- reader = new BufferedReader(new InputStreamReader(proc.getInputStream()));
- String line = null;
- while ((line = reader.readLine()) != null) {
- output.appendOutput(line);
- }
-
- exitCode = proc.waitFor();
- } catch (Exception e) {
- throw new JobException(e);
- } finally {
- if (reader != null) {
- try {
- reader.close();
- } catch (IOException e) {
- throw new JobException(e);
- }
- }
- }
- return exitCode;
- }
-
- private int remoteExec(String command) throws JobException {
- output.setStatus(JobStepStatusEnum.RUNNING);
- Session session = null;
- Channel channel = null;
- int exitCode = -1;
- try {
- JSch jsch = new JSch();
- if (identityPath != null) {
- jsch.addIdentity(identityPath);
- }
-
- session = jsch.getSession(remoteUser, remoteHost, 22);
- if (remotePassword != null) {
- session.setPassword(remotePassword);
- }
- session.setConfig("StrictHostKeyChecking", "no");
- session.connect();
-
- channel = session.openChannel("exec");
- ((ChannelExec) channel).setCommand(command);
- channel.setInputStream(null);
- PipedInputStream in = new PipedInputStream(64 * 1024);
- PipedOutputStream out = new PipedOutputStream(in);
- channel.setOutputStream(out);
- ((ChannelExec) channel).setErrStream(out); // redirect error to out
- channel.connect();
-
- byte[] tmp = new byte[1024];
- while (true) {
- while (in.available() > 0) {
- int i = in.read(tmp, 0, 1024);
- if (i < 0)
- break;
- output.appendOutput(new String(tmp, 0, i));
- }
- if (channel.isClosed()) {
- if (in.available() > 0) {
- continue;
- }
- exitCode = channel.getExitStatus();
- break;
- }
- try {
- Thread.sleep(1000);
- } catch (Exception ee) {
- throw ee;
- }
- }
- } catch (Exception e) {
- throw new JobException(e);
- } finally {
- if (channel != null) {
- channel.disconnect();
- }
- if (session != null) {
- session.disconnect();
- }
- }
- return exitCode;
- }
-
- @Override
- public void cancel() throws JobException {
- future.cancel(true);
- }
-
- public static void main(String[] args) throws JobException {
- ShellCmdOutput output = new ShellCmdOutput();
- ShellCmd shellCmd = new ShellCmd(args[0], output, args[1], args[2], args[3], false);
- shellCmd.execute();
-
- System.out.println("============================================================================");
- System.out.println(output.getExitCode());
- System.out.println(output.getOutput());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellCmdOutput.java b/job/src/main/java/com/kylinolap/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index 7197283..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmdOutput implements ICommandOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
-
- public ShellCmdOutput() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(JobStepStatusEnum s) {
- this.status = s;
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void appendOutput(String message) {
- output.append(message).append(System.getProperty("line.separator"));
- log.debug(message);
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void setExitCode(int code) {
- exitCode = code;
- }
-
- @Override
- public void reset() {
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java b/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java
deleted file mode 100644
index f0b7a4e..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmd.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import java.io.IOException;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- */
-public class ShellHadoopCmd extends ShellCmd {
- private static Logger log = LoggerFactory.getLogger(ShellHadoopCmd.class);
-
- private final String jobInstanceID;
- private final int jobStepID;
- private final JobEngineConfig engineConfig;
-
- public ShellHadoopCmd(String executeCmd, String host, String user, String password, boolean async, String instanceID, int stepID, JobEngineConfig engineConfig) {
- super(executeCmd, new ShellHadoopCmdOutput(instanceID, stepID, engineConfig), host, user, password, async);
- this.jobInstanceID = instanceID;
- this.jobStepID = stepID;
- this.engineConfig = engineConfig;
- }
-
- @Override
- public void cancel() throws JobException {
- JobDAO jobDAO = JobDAO.getInstance(engineConfig.getConfig());
- JobInstance jobInstance = null;
- try {
- jobInstance = jobDAO.getJob(jobInstanceID);
- String mrJobId = jobInstance.getSteps().get(jobStepID).getInfo(JobInstance.MR_JOB_ID);
- log.debug("kill MR job " + mrJobId);
- executeCommand("hadoop job -kill " + mrJobId);
- } catch (IOException e) {
- throw new JobException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmdOutput.java b/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmdOutput.java
deleted file mode 100644
index 6672862..0000000
--- a/job/src/main/java/com/kylinolap/job/cmd/ShellHadoopCmdOutput.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.cmd;
-
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.tools.HadoopStatusChecker;
-
-/**
- * @author xjiang
- *
- */
-public class ShellHadoopCmdOutput extends ShellCmdOutput {
-
- // mr job
- private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
- private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
- private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
- private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("HDFS: Number of bytes written=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("HDFS Read: (\\d+) HDFS Write");
-
- // hive
- private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("HDFS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
- private final KylinConfig config;
- private final String jobInstanceID;
- private final int jobStepID;
- private final String yarnUrl;
-
- private String mrJobID;
-
- public ShellHadoopCmdOutput(String jobInstanceID, int jobStepID, JobEngineConfig engineConfig) {
- super();
- this.config = engineConfig.getConfig();
- this.yarnUrl = engineConfig.getYarnStatusServiceUrl();
- this.jobInstanceID = jobInstanceID;
- this.jobStepID = jobStepID;
- this.mrJobID = null;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- if (status.isComplete()) {
- return status;
- }
-
- if (null == mrJobID || mrJobID.trim().length() == 0) {
- return JobStepStatusEnum.WAITING;
- }
-
- HadoopStatusChecker hadoopJobChecker = new HadoopStatusChecker(this.yarnUrl, this.mrJobID, output);
- status = hadoopJobChecker.checkStatus();
-
- return status;
- }
-
- @Override
- public void appendOutput(String message) {
- super.appendOutput(message);
- try {
- updateJobStepInfo(message);
- } catch (IOException e) {
- log.error("Failed to append output!\n" + message, e);
- status = JobStepStatusEnum.ERROR;
- }
-
- }
-
- private void updateJobStepInfo(final String message) throws IOException {
-
- JobDAO jobDAO = JobDAO.getInstance(config);
- JobInstance jobInstance = jobDAO.getJob(jobInstanceID);
- JobStep jobStep = jobInstance.getSteps().get(jobStepID);
-
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- jobStep.putInfo(JobInstance.YARN_APP_ID, appId);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- jobStep.putInfo(JobInstance.YARN_APP_URL, appTrackingUrl);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- jobStep.putInfo(JobInstance.MR_JOB_ID, mrJobID);
- jobDAO.updateJobInstance(jobInstance);
- this.mrJobID = mrJobID;
- log.debug("Get hadoop job id " + mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- jobStep.putInfo(JobInstance.HDFS_BYTES_WRITTEN, hdfsWritten);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- jobStep.putInfo(JobInstance.SOURCE_RECORDS_COUNT, sourceCount);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- jobStep.putInfo(JobInstance.SOURCE_RECORDS_SIZE, sourceSize);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- jobStep.putInfo(JobInstance.MR_JOB_ID, jobId);
- jobStep.putInfo(JobInstance.YARN_APP_URL, trackingUrl);
- jobDAO.updateJobInstance(jobInstance);
- }
-
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- jobStep.putInfo(JobInstance.HDFS_BYTES_WRITTEN, hdfsWritten);
- jobDAO.updateJobInstance(jobInstance);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/BatchConstants.java b/job/src/main/java/com/kylinolap/job/constant/BatchConstants.java
deleted file mode 100644
index b1dab02..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.constant;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public interface BatchConstants {
-
- public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
- public static final String CFG_CUBE_NAME = "cube.name";
- public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-
- public static final String INPUT_DELIM = "input.delim";
-
- public static final String OUTPUT_PATH = "output.path";
-
- public static final String TABLE_NAME = "table.name";
- public static final String TABLE_COLUMNS = "table.columns";
-
- public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
- public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
-
- public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
- public static final String REGION_NUMBER = "region.number";
- public static final String CUBE_CAPACITY = "cube.capacity";
-
- public static final int COUNTER_MAX = 100000;
- public static final int ERROR_RECORD_THRESHOLD = 10;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/JobConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/JobConstants.java b/job/src/main/java/com/kylinolap/job/constant/JobConstants.java
deleted file mode 100644
index 21d099b..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/JobConstants.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.constant;
-
-/**
- * @author ysong1, xduo
- *
- */
-public class JobConstants {
-
- public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
- public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
- public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
- public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
-
- public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
- public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
- public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
- public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
- public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
- public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
- public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
- public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
- public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
- public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
-
- public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
- public static final String PROP_JOB_FLOW = "jobFlow";
- public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
- public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
- public static final String PROP_COMMAND = "command";
- // public static final String PROP_STORAGE_LOCATION =
- // "storageLocationIdentifier";
- public static final String PROP_JOB_ASYNC = "jobAsync";
- public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
- public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
- public static final String PROP_JOB_KILLED = "jobKilled";
- public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
- public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/JobStatusEnum.java b/job/src/main/java/com/kylinolap/job/constant/JobStatusEnum.java
deleted file mode 100644
index bf67d0a..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.constant;
-
-public enum JobStatusEnum {
-
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
- private final int code;
-
- private JobStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStatusEnum getByCode(int statusCode) {
- for (JobStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/com/kylinolap/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index fde17d0..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.constant;
-
-/**
- * @author xduo, ysong1
- *
- */
-public enum JobStepCmdTypeEnum {
- SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/JobStepStatusEnum.java b/job/src/main/java/com/kylinolap/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index debfb3c..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.constant;
-
-public enum JobStepStatusEnum {
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32);
-
- private final int code;
-
- private JobStepStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStepStatusEnum getByCode(int statusCode) {
- for (JobStepStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
- }
-
- public boolean isRunable() {
- return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/constant/SchedulerTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/constant/SchedulerTypeEnum.java b/job/src/main/java/com/kylinolap/job/constant/SchedulerTypeEnum.java
deleted file mode 100644
index 8078023..0000000
--- a/job/src/main/java/com/kylinolap/job/constant/SchedulerTypeEnum.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.constant;
-
-/**
- * @author xduo
- *
- */
-public enum SchedulerTypeEnum {
- QUATZ;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/deployment/DeploymentUtilityChecker.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/deployment/DeploymentUtilityChecker.java b/job/src/main/java/com/kylinolap/job/deployment/DeploymentUtilityChecker.java
deleted file mode 100644
index 00333bc..0000000
--- a/job/src/main/java/com/kylinolap/job/deployment/DeploymentUtilityChecker.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.kylinolap.job.deployment;
-
-/**
- * Created by honma on 9/29/14.
- */
-public enum DeploymentUtilityChecker {
- HIVE_CHECKER {
- @Override
- boolean isOkay() {
- return true;
- }
- },
- HBASE_CHECKER {
- @Override
- boolean isOkay() {
- return true;
- }
- };
-
- abstract boolean isOkay();
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/deployment/HbaseConfigPrinter.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/deployment/HbaseConfigPrinter.java b/job/src/main/java/com/kylinolap/job/deployment/HbaseConfigPrinter.java
deleted file mode 100644
index ddf6251..0000000
--- a/job/src/main/java/com/kylinolap/job/deployment/HbaseConfigPrinter.java
+++ /dev/null
@@ -1,131 +0,0 @@
-package com.kylinolap.job.deployment;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Map;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HConstants;
-
-import com.kylinolap.job.tools.LZOSupportnessChecker;
-
-/**
- * Created by honma on 9/30/14.
- * <p/>
- * This class is assumed to be run by
- * "hbase org.apache.hadoop.util.RunJar kylin-job-0.5.7-SNAPSHOT-job.jar com.kylinolap.job.deployment.HadoopConfigPrinter "
- * in the shell, so that hbase and hadoop related environment variables will be
- * visible to this class.
- */
-public class HbaseConfigPrinter {
- public static void main(String[] args) throws IOException {
- if (args.length != 1) {
- System.out.println("Usage: hbase org.apache.hadoop.util.RunJar kylin-job-0.5.7-SNAPSHOT-job.jar com.kylinolap.job.deployment.HadoopConfigPrinter targetFile");
- System.exit(1);
- }
-
- printConfigs(args[0]);
- }
-
- private static void printConfigs(String targetFile) throws IOException {
-
- File output = new File(targetFile);
- if (output.exists() && output.isDirectory()) {
- throw new IllegalStateException("The output file: " + targetFile + " is a directory");
- }
-
- StringBuilder sb = new StringBuilder();
-
- sb.append("export KYLIN_LZO_SUPPORTED=" + ConfigLoader.LZO_INFO_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_LD_LIBRARY_PATH=" + ConfigLoader.LD_LIBRARY_PATH_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_HBASE_CLASSPATH=" + ConfigLoader.HBASE_CLASSPATH_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_HBASE_CONF_PATH=" + ConfigLoader.HBASE_CONF_FOLDER_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_ZOOKEEPER_QUORUM=" + ConfigLoader.ZOOKEEP_QUORUM_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_ZOOKEEPER_CLIENT_PORT=" + ConfigLoader.ZOOKEEPER_CLIENT_PORT_LOADER.loadValue() + "\n");
- sb.append("export KYLIN_ZOOKEEPER_ZNODE_PARENT=" + ConfigLoader.ZOOKEEPER_ZNODE_PARENT_LOADER.loadValue() + "\n");
-
- FileUtils.writeStringToFile(output, sb.toString());
- }
-
- @SuppressWarnings("unused")
- private static void printAllEnv() {
- for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
- System.out.println("Key: " + entry.getKey());
- System.out.println("Value: " + entry.getValue());
- System.out.println();
- }
- }
-
- enum ConfigLoader {
-
- LZO_INFO_LOADER {
- @Override
- public String loadValue() {
- return LZOSupportnessChecker.getSupportness() ? "true" : "false";
- }
- },
-
- ZOOKEEP_QUORUM_LOADER {
- @Override
- public String loadValue() {
- Configuration conf = HBaseConfiguration.create();
- return conf.get(HConstants.ZOOKEEPER_QUORUM);
- }
- },
-
- ZOOKEEPER_ZNODE_PARENT_LOADER {
- @Override
- public String loadValue() {
- Configuration conf = HBaseConfiguration.create();
- return conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
- }
- },
-
- ZOOKEEPER_CLIENT_PORT_LOADER {
- @Override
- public String loadValue() {
- Configuration conf = HBaseConfiguration.create();
- return conf.get(HConstants.ZOOKEEPER_CLIENT_PORT);
-
- }
- },
-
- LD_LIBRARY_PATH_LOADER {
- @Override
- public String loadValue() {
- return System.getenv("LD_LIBRARY_PATH");
- }
- },
-
- HBASE_CLASSPATH_LOADER {
- @Override
- public String loadValue() {
- return System.getenv("CLASSPATH");
- }
- },
-
- HBASE_CONF_FOLDER_LOADER {
- @Override
- public String loadValue() {
- String output = HBASE_CLASSPATH_LOADER.loadValue();
- String[] paths = output.split(":");
- StringBuilder sb = new StringBuilder();
-
- for (String path : paths) {
- path = path.trim();
- File f = new File(path);
- if (StringUtils.containsIgnoreCase(path, "conf") && f.exists() && f.isDirectory() && f.getName().equalsIgnoreCase("conf")) {
- sb.append(":" + path);
- }
- }
- return sb.toString();
- }
- };
-
- public abstract String loadValue();
- }
-
-}