You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by qh...@apache.org on 2015/07/27 11:21:35 UTC
[33/52] [abbrv] incubator-kylin git commit: KYLIN-875 Split job
module into 'core-job', 'engine-mr', 'source-hive',
'storage-hbase'. The old job remains as an assembly project.
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
index b00e10e..f8d6e1a 100644
--- a/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
+++ b/job/src/main/java/org/apache/kylin/job/CubeMetadataUpgrade.java
@@ -45,23 +45,20 @@ import org.apache.kylin.cube.model.v1.CubeSegment;
import org.apache.kylin.cube.model.v1.CubeSegmentStatusEnum;
import org.apache.kylin.cube.model.v1.CubeStatusEnum;
import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.job.common.HadoopShellExecutable;
-import org.apache.kylin.job.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
+import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
+import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
+import org.apache.kylin.engine.mr.steps.RangeKeyDistributionJob;
import org.apache.kylin.job.common.ShellExecutable;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobStepStatusEnum;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.hadoop.cube.BaseCuboidJob;
-import org.apache.kylin.job.hadoop.cube.CubeHFileJob;
-import org.apache.kylin.job.hadoop.cube.FactDistinctColumnsJob;
-import org.apache.kylin.job.hadoop.cube.MergeCuboidJob;
-import org.apache.kylin.job.hadoop.cube.NDCuboidJob;
-import org.apache.kylin.job.hadoop.cube.RangeKeyDistributionJob;
-import org.apache.kylin.job.hadoop.dict.CreateDictionaryJob;
-import org.apache.kylin.job.hadoop.hbase.BulkLoadJob;
-import org.apache.kylin.job.hadoop.hbase.CreateHTableJob;
import org.apache.kylin.job.manager.ExecutableManager;
import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.MetadataManager;
@@ -72,6 +69,9 @@ import org.apache.kylin.metadata.project.ProjectManager;
import org.apache.kylin.metadata.project.RealizationEntry;
import org.apache.kylin.metadata.realization.RealizationStatusEnum;
import org.apache.kylin.metadata.realization.RealizationType;
+import org.apache.kylin.storage.hbase.steps.BulkLoadJob;
+import org.apache.kylin.storage.hbase.steps.CreateHTableJob;
+import org.apache.kylin.storage.hbase.steps.CubeHFileJob;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/JobInstance.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JobInstance.java b/job/src/main/java/org/apache/kylin/job/JobInstance.java
deleted file mode 100644
index e7f5772..0000000
--- a/job/src/main/java/org/apache/kylin/job/JobInstance.java
+++ /dev/null
@@ -1,504 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
-import com.fasterxml.jackson.annotation.JsonBackReference;
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import com.fasterxml.jackson.annotation.JsonManagedReference;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Lists;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-import org.apache.kylin.cube.model.CubeBuildTypeEnum;
-import org.apache.kylin.job.constant.JobStatusEnum;
-import org.apache.kylin.job.constant.JobStepCmdTypeEnum;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.engine.JobEngineConfig;
-
-@JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
-public class JobInstance extends RootPersistentEntity implements Comparable<JobInstance> {
-
- public static final String JOB_WORKING_DIR_PREFIX = "kylin-";
-
- public static final String YARN_APP_ID = "yarn_application_id";
- public static final String YARN_APP_URL = "yarn_application_tracking_url";
- public static final String MR_JOB_ID = "mr_job_id";
- public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
- public static final String SOURCE_RECORDS_COUNT = "source_records_count";
- public static final String SOURCE_RECORDS_SIZE = "source_records_size";
-
- public static String getStepIdentity(JobInstance jobInstance, JobStep jobStep) {
- return jobInstance.getRelatedCube() + "." + jobInstance.getUuid() + "." + jobStep.getSequenceID();
- }
-
- public static String getJobIdentity(JobInstance jobInstance) {
- return jobInstance.getRelatedCube() + "." + jobInstance.getUuid();
- }
-
- public static String getJobWorkingDir(JobInstance jobInstance, JobEngineConfig engineConfig) {
- return getJobWorkingDir(jobInstance.getUuid(), engineConfig.getHdfsWorkingDirectory());
- }
-
- public static String getJobWorkingDir(String jobUuid, String hdfsWorkdingDir) {
- if (jobUuid == null || jobUuid.equals("")) {
- throw new IllegalArgumentException("jobUuid can't be null or empty");
- }
- return hdfsWorkdingDir + "/" + JOB_WORKING_DIR_PREFIX + jobUuid;
- }
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("type")
- private CubeBuildTypeEnum type; // java implementation
- @JsonProperty("duration")
- private long duration;
- @JsonProperty("related_cube")
- private String relatedCube;
- @JsonProperty("related_segment")
- private String relatedSegment;
- @JsonProperty("exec_start_time")
- private long execStartTime;
- @JsonProperty("exec_end_time")
- private long execEndTime;
- @JsonProperty("mr_waiting")
- private long mrWaiting = 0;
- @JsonManagedReference
- @JsonProperty("steps")
- private List<JobStep> steps;
- @JsonProperty("submitter")
- private String submitter;
- @JsonProperty("job_status")
- private JobStatusEnum status;
-
- public JobStep getRunningStep() {
- for (JobStep step : this.getSteps()) {
- if (step.getStatus().equals(JobStepStatusEnum.RUNNING) || step.getStatus().equals(JobStepStatusEnum.WAITING)) {
- return step;
- }
- }
-
- return null;
- }
-
- @JsonProperty("progress")
- public double getProgress() {
- int completedStepCount = 0;
- for (JobStep step : this.getSteps()) {
- if (step.getStatus().equals(JobStepStatusEnum.FINISHED)) {
- completedStepCount++;
- }
- }
-
- return 100.0 * completedStepCount / steps.size();
- }
-
- public JobStatusEnum getStatus() {
- return this.status;
- }
-
- public void setStatus(JobStatusEnum status) {
- this.status = status;
- }
-
-// @JsonProperty("job_status")
-// public JobStatusEnum getStatus() {
-//
-// // JobStatusEnum finalJobStatus;
-// int compositResult = 0;
-//
-// // if steps status are all NEW, then job status is NEW
-// // if steps status are all FINISHED, then job status is FINISHED
-// // if steps status are all PENDING, then job status is PENDING
-// // if steps status are FINISHED and PENDING, the job status is PENDING
-// // if one of steps status is RUNNING, then job status is RUNNING
-// // if one of steps status is ERROR, then job status is ERROR
-// // if one of steps status is KILLED, then job status is KILLED
-// // default status is RUNNING
-//
-// System.out.println(this.getName());
-//
-// for (JobStep step : this.getSteps()) {
-// //System.out.println("step: " + step.getSequenceID() + "'s status:" + step.getStatus());
-// compositResult = compositResult | step.getStatus().getCode();
-// }
-//
-// System.out.println();
-//
-// if (compositResult == JobStatusEnum.FINISHED.getCode()) {
-// return JobStatusEnum.FINISHED;
-// } else if (compositResult == JobStatusEnum.NEW.getCode()) {
-// return JobStatusEnum.NEW;
-// } else if (compositResult == JobStatusEnum.PENDING.getCode()) {
-// return JobStatusEnum.PENDING;
-// } else if (compositResult == (JobStatusEnum.FINISHED.getCode() | JobStatusEnum.PENDING.getCode())) {
-// return JobStatusEnum.PENDING;
-// } else if ((compositResult & JobStatusEnum.ERROR.getCode()) == JobStatusEnum.ERROR.getCode()) {
-// return JobStatusEnum.ERROR;
-// } else if ((compositResult & JobStatusEnum.DISCARDED.getCode()) == JobStatusEnum.DISCARDED.getCode()) {
-// return JobStatusEnum.DISCARDED;
-// } else if ((compositResult & JobStatusEnum.RUNNING.getCode()) == JobStatusEnum.RUNNING.getCode()) {
-// return JobStatusEnum.RUNNING;
-// }
-//
-// return JobStatusEnum.RUNNING;
-// }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public CubeBuildTypeEnum getType() {
- return type;
- }
-
- public void setType(CubeBuildTypeEnum type) {
- this.type = type;
- }
-
- public long getDuration() {
- return duration;
- }
-
- public void setDuration(long duration) {
- this.duration = duration;
- }
-
- public String getRelatedCube() {
- return relatedCube;
- }
-
- public void setRelatedCube(String relatedCube) {
- this.relatedCube = relatedCube;
- }
-
- public String getRelatedSegment() {
- return relatedSegment;
- }
-
- public void setRelatedSegment(String relatedSegment) {
- this.relatedSegment = relatedSegment;
- }
-
- /**
- * @return the execStartTime
- */
- public long getExecStartTime() {
- return execStartTime;
- }
-
- /**
- * @param execStartTime the execStartTime to set
- */
- public void setExecStartTime(long execStartTime) {
- this.execStartTime = execStartTime;
- }
-
- /**
- * @return the execEndTime
- */
- public long getExecEndTime() {
- return execEndTime;
- }
-
- /**
- * @param execEndTime the execEndTime to set
- */
- public void setExecEndTime(long execEndTime) {
- this.execEndTime = execEndTime;
- }
-
- public long getMrWaiting() {
- return this.mrWaiting;
- }
-
- public void setMrWaiting(long mrWaiting) {
- this.mrWaiting = mrWaiting;
- }
-
- public List<JobStep> getSteps() {
- if (steps == null) {
- steps = Lists.newArrayList();
- }
- return steps;
- }
-
- public void clearSteps() {
- getSteps().clear();
- }
-
- public void addSteps(Collection<JobStep> steps) {
- this.getSteps().addAll(steps);
- }
-
- public void addStep(JobStep step) {
- getSteps().add(step);
- }
-
- public void addStep(int index, JobStep step) {
- getSteps().add(index, step);
- }
-
- public JobStep findStep(String stepName) {
- for (JobStep step : getSteps()) {
- if (stepName.equals(step.getName())) {
- return step;
- }
- }
- return null;
- }
-
-
- public String getSubmitter() {
- return submitter;
- }
-
- public void setSubmitter(String submitter) {
- this.submitter = submitter;
- }
-
-
-
-
- @JsonIgnoreProperties(ignoreUnknown = true)
- public static class JobStep implements Comparable<JobStep> {
-
- @JsonBackReference
- private JobInstance jobInstance;
-
- @JsonProperty("id")
- private String id;
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("sequence_id")
- private int sequenceID;
-
- @JsonProperty("exec_cmd")
- private String execCmd;
-
- @JsonProperty("interrupt_cmd")
- private String InterruptCmd;
-
- @JsonProperty("exec_start_time")
- private long execStartTime;
- @JsonProperty("exec_end_time")
- private long execEndTime;
- @JsonProperty("exec_wait_time")
- private long execWaitTime;
-
- @JsonProperty("step_status")
- private JobStepStatusEnum status;
-
- @JsonProperty("cmd_type")
- private JobStepCmdTypeEnum cmdType = JobStepCmdTypeEnum.SHELL_CMD_HADOOP;
-
- @JsonProperty("info")
- private ConcurrentHashMap<String, String> info = new ConcurrentHashMap<String, String>();
-
- @JsonProperty("run_async")
- private boolean runAsync = false;
-
- private ConcurrentHashMap<String, String> getInfo() {
- return info;
- }
-
- public void putInfo(String key, String value) {
- getInfo().put(key, value);
- }
-
- public String getInfo(String key) {
- return getInfo().get(key);
- }
-
- public void clearInfo() {
- getInfo().clear();
- }
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public int getSequenceID() {
- return sequenceID;
- }
-
- public void setSequenceID(int sequenceID) {
- this.sequenceID = sequenceID;
- }
-
- public String getExecCmd() {
- return execCmd;
- }
-
- public void setExecCmd(String execCmd) {
- this.execCmd = execCmd;
- }
-
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- public void setStatus(JobStepStatusEnum status) {
- this.status = status;
- }
-
-
-
- public String getId() {
- return id;
- }
-
- public void setId(String id) {
- this.id = id;
- }
-
- /**
- * @return the execStartTime
- */
- public long getExecStartTime() {
- return execStartTime;
- }
-
- /**
- * @param execStartTime the execStartTime to set
- */
- public void setExecStartTime(long execStartTime) {
- this.execStartTime = execStartTime;
- }
-
- /**
- * @return the execEndTime
- */
- public long getExecEndTime() {
- return execEndTime;
- }
-
- /**
- * @param execEndTime the execEndTime to set
- */
- public void setExecEndTime(long execEndTime) {
- this.execEndTime = execEndTime;
- }
-
- public long getExecWaitTime() {
- return execWaitTime;
- }
-
- public void setExecWaitTime(long execWaitTime) {
- this.execWaitTime = execWaitTime;
- }
-
- public String getInterruptCmd() {
- return InterruptCmd;
- }
-
- public void setInterruptCmd(String interruptCmd) {
- InterruptCmd = interruptCmd;
- }
-
- public JobStepCmdTypeEnum getCmdType() {
- return cmdType;
- }
-
- public void setCmdType(JobStepCmdTypeEnum cmdType) {
- this.cmdType = cmdType;
- }
-
- /**
- * @return the runAsync
- */
- public boolean isRunAsync() {
- return runAsync;
- }
-
- /**
- * @param runAsync the runAsync to set
- */
- public void setRunAsync(boolean runAsync) {
- this.runAsync = runAsync;
- }
-
- /**
- * @return the jobInstance
- */
- public JobInstance getJobInstance() {
- return jobInstance;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((name == null) ? 0 : name.hashCode());
- result = prime * result + sequenceID;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- JobStep other = (JobStep) obj;
- if (name == null) {
- if (other.name != null)
- return false;
- } else if (!name.equals(other.name))
- return false;
- if (sequenceID != other.sequenceID)
- return false;
- return true;
- }
-
- @Override
- public int compareTo(JobStep o) {
- if (this.sequenceID < o.sequenceID) {
- return -1;
- } else if (this.sequenceID > o.sequenceID) {
- return 1;
- } else {
- return 0;
- }
- }
- }
-
- @Override
- public int compareTo(JobInstance o) {
- return o.lastModified<this.lastModified?-1:o.lastModified>this.lastModified?1:0;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java b/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
deleted file mode 100644
index eba6bd4..0000000
--- a/job/src/main/java/org/apache/kylin/job/JoinedFlatTable.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-import javax.xml.parsers.ParserConfigurationException;
-
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.hadoop.hive.SqlHiveDataTypeMapping;
-import org.apache.kylin.metadata.model.DataModelDesc;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IntermediateColumnDesc;
-import org.apache.kylin.metadata.model.JoinDesc;
-import org.apache.kylin.metadata.model.LookupDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-import org.w3c.dom.Document;
-import org.w3c.dom.NodeList;
-import org.xml.sax.SAXException;
-
-/**
- * @author George Song (ysong1)
- *
- */
-
-public class JoinedFlatTable {
-
- public static final String FACT_TABLE_ALIAS = "FACT_TABLE";
-
- public static final String LOOKUP_TABLE_ALAIS_PREFIX = "LOOKUP_";
-
- public static String getTableDir(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
- return storageDfsDir + "/" + intermediateTableDesc.getTableName();
- }
-
- public static String generateCreateTableStatement(IJoinedFlatTableDesc intermediateTableDesc, String storageDfsDir) {
- StringBuilder ddl = new StringBuilder();
-
- ddl.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediateTableDesc.getTableName() + "\n");
-
- ddl.append("(" + "\n");
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
- if (i > 0) {
- ddl.append(",");
- }
- ddl.append(colName(col.getCanonicalName()) + " " + SqlHiveDataTypeMapping.getHiveDataType(col.getDataType()) + "\n");
- }
- ddl.append(")" + "\n");
-
- ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\177'" + "\n");
- ddl.append("STORED AS SEQUENCEFILE" + "\n");
- ddl.append("LOCATION '" + storageDfsDir + "/" + intermediateTableDesc.getTableName() + "';").append("\n");
- // ddl.append("TBLPROPERTIES ('serialization.null.format'='\\\\N')" +
- // ";\n");
- return ddl.toString();
- }
-
- public static String generateDropTableStatement(IJoinedFlatTableDesc intermediateTableDesc) {
- StringBuilder ddl = new StringBuilder();
- ddl.append("DROP TABLE IF EXISTS " + intermediateTableDesc.getTableName() + ";").append("\n");
- return ddl.toString();
- }
-
- public static String generateInsertDataStatement(IJoinedFlatTableDesc intermediateTableDesc, JobEngineConfig engineConfig) throws IOException {
- StringBuilder sql = new StringBuilder();
-
- File hadoopPropertiesFile = new File(engineConfig.getHiveConfFilePath());
-
- if (hadoopPropertiesFile.exists()) {
- DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
- DocumentBuilder builder;
- Document doc;
- try {
- builder = factory.newDocumentBuilder();
- doc = builder.parse(hadoopPropertiesFile);
- NodeList nl = doc.getElementsByTagName("property");
- for (int i = 0; i < nl.getLength(); i++) {
- String name = doc.getElementsByTagName("name").item(i).getFirstChild().getNodeValue();
- String value = doc.getElementsByTagName("value").item(i).getFirstChild().getNodeValue();
- if (name.equals("tmpjars") == false) {
- sql.append("SET " + name + "=" + value + ";").append("\n");
- }
- }
-
- } catch (ParserConfigurationException e) {
- throw new IOException(e);
- } catch (SAXException e) {
- throw new IOException(e);
- }
- }
-
- sql.append("INSERT OVERWRITE TABLE " + intermediateTableDesc.getTableName() + " " + generateSelectDataStatement(intermediateTableDesc) + ";").append("\n");
-
- return sql.toString();
- }
-
- public static String generateSelectDataStatement(IJoinedFlatTableDesc intermediateTableDesc) {
- StringBuilder sql = new StringBuilder();
- sql.append("SELECT" + "\n");
- String tableAlias;
- Map<String, String> tableAliasMap = buildTableAliasMap(intermediateTableDesc.getDataModel());
- for (int i = 0; i < intermediateTableDesc.getColumnList().size(); i++) {
- IntermediateColumnDesc col = intermediateTableDesc.getColumnList().get(i);
- if (i > 0) {
- sql.append(",");
- }
- tableAlias = tableAliasMap.get(col.getTableName());
- sql.append(tableAlias + "." + col.getColumnName() + "\n");
- }
- appendJoinStatement(intermediateTableDesc, sql, tableAliasMap);
- appendWhereStatement(intermediateTableDesc, sql, tableAliasMap);
- return sql.toString();
- }
-
- private static Map<String, String> buildTableAliasMap(DataModelDesc dataModelDesc) {
- Map<String, String> tableAliasMap = new HashMap<String, String>();
-
- tableAliasMap.put(dataModelDesc.getFactTable().toUpperCase(), FACT_TABLE_ALIAS);
-
- int i = 1;
- for (LookupDesc lookupDesc: dataModelDesc.getLookups()) {
- JoinDesc join = lookupDesc.getJoin();
- if (join != null) {
- tableAliasMap.put(lookupDesc.getTable().toUpperCase(), LOOKUP_TABLE_ALAIS_PREFIX + i);
- i++;
- }
-
- }
- return tableAliasMap;
- }
-
- private static void appendJoinStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- Set<String> dimTableCache = new HashSet<String>();
-
- DataModelDesc dataModelDesc = intermediateTableDesc.getDataModel();
- String factTableName = dataModelDesc.getFactTable();
- String factTableAlias = tableAliasMap.get(factTableName);
- sql.append("FROM " + factTableName + " as " + factTableAlias + " \n");
-
- for (LookupDesc lookupDesc : dataModelDesc.getLookups()) {
- JoinDesc join = lookupDesc.getJoin();
- if (join != null && join.getType().equals("") == false) {
- String joinType = join.getType().toUpperCase();
- String dimTableName = lookupDesc.getTable();
- if (!dimTableCache.contains(dimTableName)) {
- TblColRef[] pk = join.getPrimaryKeyColumns();
- TblColRef[] fk = join.getForeignKeyColumns();
- if (pk.length != fk.length) {
- throw new RuntimeException("Invalid join condition of lookup table:" + lookupDesc);
- }
- sql.append(joinType + " JOIN " + dimTableName + " as " + tableAliasMap.get(dimTableName) + "\n");
- sql.append("ON ");
- for (int i = 0; i < pk.length; i++) {
- if (i > 0) {
- sql.append(" AND ");
- }
- sql.append(factTableAlias + "." + fk[i].getName() + " = " + tableAliasMap.get(dimTableName) + "." + pk[i].getName());
- }
- sql.append("\n");
-
- dimTableCache.add(dimTableName);
- }
- }
- }
- }
-
- private static void appendWhereStatement(IJoinedFlatTableDesc intermediateTableDesc, StringBuilder sql, Map<String, String> tableAliasMap) {
- if (!(intermediateTableDesc instanceof CubeJoinedFlatTableDesc)) {
- return;//TODO: for now only cube segments support filter and partition
- }
- CubeJoinedFlatTableDesc desc = (CubeJoinedFlatTableDesc) intermediateTableDesc;
-
- boolean hasCondition = false;
- StringBuilder whereBuilder = new StringBuilder();
- whereBuilder.append("WHERE");
-
- CubeDesc cubeDesc = desc.getCubeDesc();
- DataModelDesc model = cubeDesc.getModel();
-
- if (model.getFilterCondition() != null && model.getFilterCondition().equals("") == false) {
- whereBuilder.append(" (").append(model.getFilterCondition()).append(") ");
- hasCondition = true;
- }
-
- CubeSegment cubeSegment = desc.getCubeSegment();
-
- if (null != cubeSegment) {
- PartitionDesc partDesc = model.getPartitionDesc();
- long dateStart = cubeSegment.getDateRangeStart();
- long dateEnd = cubeSegment.getDateRangeEnd();
-
- if (!(dateStart == 0 && dateEnd == Long.MAX_VALUE)) {
- whereBuilder.append(hasCondition ? " AND (" : " (");
- whereBuilder.append(partDesc.getPartitionConditionBuilder().buildDateRangeCondition(partDesc, dateStart, dateEnd, tableAliasMap));
- whereBuilder.append(")\n");
- hasCondition = true;
- }
- }
-
- if (hasCondition) {
- sql.append(whereBuilder.toString());
- }
- }
-
- private static String colName(String canonicalColName) {
- return canonicalColName.replace(".", "_");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
deleted file mode 100644
index 2ed2fc2..0000000
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
-
-/**
- */
-public interface Scheduler<T extends Executable> {
-
- void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
-
- void shutdown() throws SchedulerException;
-
- boolean stop(T executable) throws SchedulerException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
- @Override
- public void log(String message) {
- this.appendOutput(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public interface ICommandOutput extends Logger {
-
- public void setStatus(JobStepStatusEnum status);
-
- public JobStepStatusEnum getStatus();
-
- public void appendOutput(String message);
-
- public String getOutput();
-
- public void setExitCode(int exitCode);
-
- public int getExitCode();
-
- public void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- */
-public interface IJobCommand {
-
- public ICommandOutput execute() throws JobException;
-
- public void cancel() throws JobException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index 6a718fc..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmd implements IJobCommand {
-
- private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
- private final String executeCommand;
- private final ICommandOutput output;
- private final boolean isAsync;
- private final CliCommandExecutor cliCommandExecutor;
-
- private FutureTask<Integer> future;
-
- private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
- this.executeCommand = executeCmd;
- this.output = out;
- this.cliCommandExecutor = new CliCommandExecutor();
- this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
- this.isAsync = async;
- }
-
- public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
- this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
- }
-
- @Override
- public ICommandOutput execute() throws JobException {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- future = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() throws JobException, IOException {
- executor.shutdown();
- return executeCommand(executeCommand);
- }
- });
- executor.execute(future);
-
- int exitCode = -1;
- if (!isAsync) {
- try {
- exitCode = future.get();
- log.info("finish executing");
- } catch (CancellationException e) {
- log.debug("Command is cancelled");
- exitCode = -2;
- } catch (Exception e) {
- throw new JobException("Error when execute job " + executeCommand, e);
- } finally {
- if (exitCode == 0) {
- output.setStatus(JobStepStatusEnum.FINISHED);
- } else if (exitCode == -2) {
- output.setStatus(JobStepStatusEnum.DISCARDED);
- } else {
- output.setStatus(JobStepStatusEnum.ERROR);
- }
- output.setExitCode(exitCode);
- }
- }
- return output;
- }
-
- protected int executeCommand(String command) throws JobException, IOException {
- output.reset();
- output.setStatus(JobStepStatusEnum.RUNNING);
- return cliCommandExecutor.execute(command, output).getFirst();
- }
-
- @Override
- public void cancel() throws JobException {
- future.cancel(true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index ebcad47..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
-
- public ShellCmdOutput() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(JobStepStatusEnum s) {
- this.status = s;
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void appendOutput(String message) {
- output.append(message).append(System.getProperty("line.separator"));
- log.debug(message);
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void setExitCode(int code) {
- exitCode = code;
- }
-
- @Override
- public void reset() {
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 288fd31..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author xduo
- *
- */
-public class HadoopCmdOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
- private final StringBuilder output;
- private final Job job;
-
- public HadoopCmdOutput(Job job, StringBuilder output) {
- super();
- this.job = job;
- this.output = output;
- }
-
- public String getMrJobId() {
- return getInfo().get(ExecutableConstants.MR_JOB_ID);
- }
-
- public Map<String, String> getInfo() {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
- }
- return status;
- } else {
- return Collections.emptyMap();
- }
- }
-
- private String mapInputRecords;
- private String hdfsBytesWritten;
- private String hdfsBytesRead;
-
- public String getMapInputRecords() {
- return mapInputRecords;
- }
-
- public String getHdfsBytesWritten() {
- return hdfsBytesWritten;
- }
-
- public String getHdfsBytesRead() {
- return hdfsBytesRead;
- }
-
- public void updateJobCounter() {
- try {
- Counters counters = job.getCounters();
- if (counters == null) {
- String errorMsg = "no counters for job " + getMrJobId();
- log.warn(errorMsg);
- output.append(errorMsg);
- return;
- }
- this.output.append(counters.toString()).append("\n");
- log.debug(counters.toString());
-
- mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
- hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
- hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- output.append(e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index 2da5b2a..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-/**
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
- private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
- private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
- public HadoopShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getJobClass();
- String params = getJobParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob job = constructor.newInstance();
- String[] args = params.trim().split("\\s+");
- logger.info("parameters of the HadoopShellExecutable:");
- logger.info(params);
- int result;
- StringBuilder log = new StringBuilder();
- try {
- result = ToolRunner.run(job, args);
- } catch (Exception ex) {
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- result = 2;
- }
- log.append("result code:").append(result);
- return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setJobParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
- public String getJobParams() {
- return getParam(KEY_PARAMS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index ffe45ed..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.hive.HiveClient;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-/**
- */
-public class HqlExecutable extends AbstractExecutable {
-
- private static final String HQL = "hql";
- private static final String HIVE_CONFIG = "hive-config";
-
- public HqlExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- Map<String, String> configMap = getConfiguration();
- HiveClient hiveClient = new HiveClient(configMap);
-
- for (String hql: getHqls()) {
- hiveClient.executeHQL(hql);
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- } catch (Exception e) {
- logger.error("error run hive query:" + getHqls(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setConfiguration(Map<String, String> configMap) {
- if(configMap != null) {
- String configStr = "";
- try {
- configStr = JsonUtil.writeValueAsString(configMap);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- setParam(HIVE_CONFIG, configStr);
- }
- }
-
-
- @SuppressWarnings("unchecked")
- private Map<String, String> getConfiguration() {
- String configStr = getParam(HIVE_CONFIG);
- Map<String, String> result = null;
- if(configStr != null) {
- try {
- result = JsonUtil.readValue(configStr, HashMap.class);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- return result;
- }
-
- public void setHqls(List<String> hqls) {
- setParam(HQL, StringUtils.join(hqls, ";"));
- }
-
- private List<String> getHqls() {
- final String hqls = getParam(HQL);
- if (hqls != null) {
- return Lists.newArrayList(StringUtils.split(hqls, ";"));
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index f8eab6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
- private static final String KEY_MR_JOB = "MR_JOB_CLASS";
- private static final String KEY_PARAMS = "MR_JOB_PARAMS";
- private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
-
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
- public MapReduceExecutable() {
- super();
- }
-
- @Override
- protected void onExecuteStart(ExecutableContext executableContext) {
- final Output output = executableManager.getOutput(getId());
- if (output.getExtra().containsKey(START_TIME)) {
- final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
- if (mrJobId == null) {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- return;
- }
- try {
- Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
- if (job.getJobState() == JobStatus.State.FAILED) {
- //remove previous mr job info
- super.onExecuteStart(executableContext);
- } else {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- }
- } catch (IOException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- } catch (InterruptedException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- }
- } else {
- super.onExecuteStart(executableContext);
- }
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getMapReduceJobClass();
- String params = getMapReduceParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- Job job;
- final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
- if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
- job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
- logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
- } else {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob hadoopJob = constructor.newInstance();
- hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
- logger.info("parameters of the MapReduceExecutable:");
- logger.info(params);
- String[] args = params.trim().split("\\s+");
- try {
- //for async mr job, ToolRunner just return 0;
- ToolRunner.run(hadoopJob, args);
- } catch (Exception ex) {
- StringBuilder log = new StringBuilder();
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- log.append("result code:").append(2);
- return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
- }
- job = hadoopJob.getJob();
- }
- final StringBuilder output = new StringBuilder();
- final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
- final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
- if (restStatusCheckUrl == null) {
- logger.error("restStatusCheckUrl is null");
- return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
- }
- String mrJobId = hadoopCmdOutput.getMrJobId();
- HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
- JobStepStatusEnum status = JobStepStatusEnum.NEW;
- while (!isDiscarded()) {
- JobStepStatusEnum newStatus = statusChecker.checkStatus();
- if (status == JobStepStatusEnum.KILLED) {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
- return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
- }
- if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
- final long waitTime = System.currentTimeMillis() - getStartTime();
- setMapReduceWaitTime(waitTime);
- }
- status = newStatus;
- executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
- if (status.isComplete()) {
- final Map<String, String> info = hadoopCmdOutput.getInfo();
- readCounters(hadoopCmdOutput, info);
- executableManager.addJobInfo(getId(), info);
-
- if (status == JobStepStatusEnum.FINISHED) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- } else {
- return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
- }
- }
- Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
- }
- //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
- hadoopCmdOutput.updateJobCounter();
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
-
- String saveAs = getParam(KEY_COUNTER_SAVEAS);
- if (saveAs != null) {
- String[] saveAsNames = saveAs.split(",");
- saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
- }
- }
-
- private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
- if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
- info.put(saveAsNames[i].trim(), counter);
- }
- }
-
- private String getRestStatusCheckUrl(Job job, KylinConfig config) {
- final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
- if (yarnStatusCheckUrl != null) {
- return yarnStatusCheckUrl;
- } else {
- logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
- }
- String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
- if (StringUtils.isEmpty(rmWebHost)) {
- return null;
- }
- if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
- //do nothing
- } else {
- rmWebHost = "http://" + rmWebHost;
- }
- logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
- return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
- }
-
- public long getMapReduceWaitTime() {
- return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
- }
-
- public void setMapReduceWaitTime(long t) {
- addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
- }
-
- public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getMapReduceJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setMapReduceParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
- public String getMapReduceParams() {
- return getParam(KEY_PARAMS);
- }
-
- public String getCounterSaveAs() {
- return getParam(KEY_COUNTER_SAVEAS);
- }
-
- public void setCounterSaveAs(String value) {
- setParam(KEY_COUNTER_SAVEAS, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 786698e..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class ShellExecutable extends AbstractExecutable {
-
- private static final String CMD = "cmd";
-
- public ShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- logger.info("executing:" + getCmd());
- final ShellExecutableLogger logger = new ShellExecutableLogger();
- final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
- executableManager.addJobInfo(getId(), logger.getInfo());
- return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setCmd(String cmd) {
- setParam(CMD, cmd);
- }
-
- public String getCmd() {
- return getParam(CMD);
- }
-
- private static class ShellExecutableLogger implements Logger {
-
- private final Map<String, String> info = Maps.newHashMap();
-
- private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
- private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
- private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
- private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
- // hive
- private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
- @Override
- public void log(String message) {
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, appId);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- info.put(ExecutableConstants.MR_JOB_ID, jobId);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- }
-
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
- }
-
- Map<String, String> getInfo() {
- return info;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 3a64d02..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public interface BatchConstants {
-
- public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
- public static final String CFG_CUBE_NAME = "cube.name";
- public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-
- public static final String CFG_II_NAME = "ii.name";
- public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
- public static final String INPUT_DELIM = "input.delim";
- public static final String OUTPUT_PATH = "output.path";
-
- public static final String TABLE_NAME = "table.name";
- public static final String TABLE_COLUMNS = "table.columns";
-
- public static final String CFG_IS_MERGE = "is.merge";
- public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
- public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
-
- public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
- public static final String REGION_NUMBER = "region.number";
- public static final String CUBE_CAPACITY = "cube.capacity";
-
- public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
- public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
- public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
- public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
- public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
-
- public static final int COUNTER_MAX = 100000;
- public static final int ERROR_RECORD_THRESHOLD = 100;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index fdcfdbe..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- */
-public final class ExecutableConstants {
-
- private ExecutableConstants(){}
-
- public static final String YARN_APP_ID = "yarn_application_id";
-
- public static final String YARN_APP_URL = "yarn_application_tracking_url";
- public static final String MR_JOB_ID = "mr_job_id";
- public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
- public static final String SOURCE_RECORDS_COUNT = "source_records_count";
- public static final String SOURCE_RECORDS_SIZE = "source_records_size";
- public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
-
-
-
- public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
- public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
- public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
- public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
- public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
- public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
- public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
- public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
- public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
- public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
- public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
- public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
- public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
- public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
- public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
- public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
- public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
- public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
- public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-
- public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
- public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
- public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
- public static final String PROP_JOB_FLOW = "jobFlow";
- public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
- public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
- public static final String PROP_COMMAND = "command";
- // public static final String PROP_STORAGE_LOCATION =
- // "storageLocationIdentifier";
- public static final String PROP_JOB_ASYNC = "jobAsync";
- public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
- public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
- public static final String PROP_JOB_KILLED = "jobKilled";
- public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
- public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index a4ef564..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
- private final int code;
-
- private JobStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStatusEnum getByCode(int statusCode) {
- for (JobStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 02b40a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- *
- */
-public enum JobStepCmdTypeEnum {
- SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/1776fc0f/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index 08ee79a..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
- private final int code;
-
- private JobStepStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStepStatusEnum getByCode(int statusCode) {
- for (JobStepStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
- }
-
- public boolean isRunable() {
- return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
- }
-}