You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/23 13:21:49 UTC
[12/23] incubator-kylin git commit: KYLIN-875 half way
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/Scheduler.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/Scheduler.java b/job/src/main/java/org/apache/kylin/job/Scheduler.java
deleted file mode 100644
index 2ed2fc2..0000000
--- a/job/src/main/java/org/apache/kylin/job/Scheduler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job;
-
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.exception.SchedulerException;
-import org.apache.kylin.job.execution.Executable;
-import org.apache.kylin.job.lock.JobLock;
-
-/**
- */
-public interface Scheduler<T extends Executable> {
-
- void init(JobEngineConfig jobEngineConfig, JobLock jobLock) throws SchedulerException;
-
- void shutdown() throws SchedulerException;
-
- boolean stop(T executable) throws SchedulerException;
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
deleted file mode 100644
index 29b5324..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/BaseCommandOutput.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-/**
- */
-public abstract class BaseCommandOutput implements ICommandOutput {
-
- @Override
- public void log(String message) {
- this.appendOutput(message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
deleted file mode 100644
index 6cab6a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ICommandOutput.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public interface ICommandOutput extends Logger {
-
- public void setStatus(JobStepStatusEnum status);
-
- public JobStepStatusEnum getStatus();
-
- public void appendOutput(String message);
-
- public String getOutput();
-
- public void setExitCode(int exitCode);
-
- public int getExitCode();
-
- public void reset();
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java b/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
deleted file mode 100644
index 5a47173..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/IJobCommand.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.job.exception.JobException;
-
-/**
- * @author xjiang
- *
- */
-public interface IJobCommand {
-
- public ICommandOutput execute() throws JobException;
-
- public void cancel() throws JobException;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
deleted file mode 100644
index 6a718fc..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmd.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.apache.kylin.common.util.CliCommandExecutor;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.JobException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.concurrent.*;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmd implements IJobCommand {
-
- private static Logger log = LoggerFactory.getLogger(ShellCmd.class);
-
- private final String executeCommand;
- private final ICommandOutput output;
- private final boolean isAsync;
- private final CliCommandExecutor cliCommandExecutor;
-
- private FutureTask<Integer> future;
-
- private ShellCmd(String executeCmd, ICommandOutput out, String host, int port, String user, String password, boolean async) {
- this.executeCommand = executeCmd;
- this.output = out;
- this.cliCommandExecutor = new CliCommandExecutor();
- this.cliCommandExecutor.setRunAtRemote(host, port, user, password);
- this.isAsync = async;
- }
-
- public ShellCmd(String executeCmd, String host, int port, String user, String password, boolean async) {
- this(executeCmd, new ShellCmdOutput(), host, port, user, password, async);
- }
-
- @Override
- public ICommandOutput execute() throws JobException {
-
- final ExecutorService executor = Executors.newSingleThreadExecutor();
- future = new FutureTask<Integer>(new Callable<Integer>() {
- public Integer call() throws JobException, IOException {
- executor.shutdown();
- return executeCommand(executeCommand);
- }
- });
- executor.execute(future);
-
- int exitCode = -1;
- if (!isAsync) {
- try {
- exitCode = future.get();
- log.info("finish executing");
- } catch (CancellationException e) {
- log.debug("Command is cancelled");
- exitCode = -2;
- } catch (Exception e) {
- throw new JobException("Error when execute job " + executeCommand, e);
- } finally {
- if (exitCode == 0) {
- output.setStatus(JobStepStatusEnum.FINISHED);
- } else if (exitCode == -2) {
- output.setStatus(JobStepStatusEnum.DISCARDED);
- } else {
- output.setStatus(JobStepStatusEnum.ERROR);
- }
- output.setExitCode(exitCode);
- }
- }
- return output;
- }
-
- protected int executeCommand(String command) throws JobException, IOException {
- output.reset();
- output.setStatus(JobStepStatusEnum.RUNNING);
- return cliCommandExecutor.execute(command, output).getFirst();
- }
-
- @Override
- public void cancel() throws JobException {
- future.cancel(true);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java b/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
deleted file mode 100644
index ebcad47..0000000
--- a/job/src/main/java/org/apache/kylin/job/cmd/ShellCmdOutput.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.cmd;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-
-/**
- * @author xjiang
- *
- */
-public class ShellCmdOutput extends BaseCommandOutput implements ICommandOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(ShellCmdOutput.class);
-
- protected StringBuilder output;
- protected int exitCode;
- protected JobStepStatusEnum status;
-
- public ShellCmdOutput() {
- init();
- }
-
- private void init() {
- output = new StringBuilder();
- exitCode = -1;
- status = JobStepStatusEnum.NEW;
- }
-
- @Override
- public JobStepStatusEnum getStatus() {
- return status;
- }
-
- @Override
- public void setStatus(JobStepStatusEnum s) {
- this.status = s;
- }
-
- @Override
- public String getOutput() {
- return output.toString();
- }
-
- @Override
- public void appendOutput(String message) {
- output.append(message).append(System.getProperty("line.separator"));
- log.debug(message);
- }
-
- @Override
- public int getExitCode() {
- return exitCode;
- }
-
- @Override
- public void setExitCode(int code) {
- exitCode = code;
- }
-
- @Override
- public void reset() {
- init();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java b/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
deleted file mode 100644
index 288fd31..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopCmdOutput.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.hadoop.mapreduce.Counters;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.TaskCounter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author xduo
- *
- */
-public class HadoopCmdOutput {
-
- protected static final Logger log = LoggerFactory.getLogger(HadoopCmdOutput.class);
-
- private final StringBuilder output;
- private final Job job;
-
- public HadoopCmdOutput(Job job, StringBuilder output) {
- super();
- this.job = job;
- this.output = output;
- }
-
- public String getMrJobId() {
- return getInfo().get(ExecutableConstants.MR_JOB_ID);
- }
-
- public Map<String, String> getInfo() {
- if (job != null) {
- Map<String, String> status = new HashMap<String, String>();
- if (null != job.getJobID()) {
- status.put(ExecutableConstants.MR_JOB_ID, job.getJobID().toString());
- }
- if (null != job.getTrackingURL()) {
- status.put(ExecutableConstants.YARN_APP_URL, job.getTrackingURL().toString());
- }
- return status;
- } else {
- return Collections.emptyMap();
- }
- }
-
- private String mapInputRecords;
- private String hdfsBytesWritten;
- private String hdfsBytesRead;
-
- public String getMapInputRecords() {
- return mapInputRecords;
- }
-
- public String getHdfsBytesWritten() {
- return hdfsBytesWritten;
- }
-
- public String getHdfsBytesRead() {
- return hdfsBytesRead;
- }
-
- public void updateJobCounter() {
- try {
- Counters counters = job.getCounters();
- if (counters == null) {
- String errorMsg = "no counters for job " + getMrJobId();
- log.warn(errorMsg);
- output.append(errorMsg);
- return;
- }
- this.output.append(counters.toString()).append("\n");
- log.debug(counters.toString());
-
- mapInputRecords = String.valueOf(counters.findCounter(TaskCounter.MAP_INPUT_RECORDS).getValue());
- hdfsBytesWritten = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_WRITTEN").getValue());
- hdfsBytesRead = String.valueOf(counters.findCounter("FileSystemCounters", "HDFS_BYTES_READ").getValue());
- } catch (Exception e) {
- log.error(e.getLocalizedMessage(), e);
- output.append(e.getLocalizedMessage());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
deleted file mode 100644
index 2da5b2a..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HadoopShellExecutable.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-
-import org.apache.hadoop.util.ToolRunner;
-
-import com.google.common.base.Preconditions;
-
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.execution.AbstractExecutable;
-
-/**
- */
-public class HadoopShellExecutable extends AbstractExecutable {
-
- private static final String KEY_MR_JOB = "HADOOP_SHELL_JOB_CLASS";
- private static final String KEY_PARAMS = "HADOOP_SHELL_JOB_PARAMS";
-
- public HadoopShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getJobClass();
- String params = getJobParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob job = constructor.newInstance();
- String[] args = params.trim().split("\\s+");
- logger.info("parameters of the HadoopShellExecutable:");
- logger.info(params);
- int result;
- StringBuilder log = new StringBuilder();
- try {
- result = ToolRunner.run(job, args);
- } catch (Exception ex) {
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- result = 2;
- }
- log.append("result code:").append(result);
- return result == 0 ? new ExecuteResult(ExecuteResult.State.SUCCEED, log.toString()):new ExecuteResult(ExecuteResult.State.FAILED, log.toString());
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setJobParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
- public String getJobParams() {
- return getParam(KEY_PARAMS);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java b/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
deleted file mode 100644
index ffe45ed..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/HqlExecutable.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.source.hive.HiveClient;
-import org.datanucleus.store.types.backed.HashMap;
-
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.collect.Lists;
-
-import org.apache.kylin.common.util.JsonUtil;
-
-/**
- */
-public class HqlExecutable extends AbstractExecutable {
-
- private static final String HQL = "hql";
- private static final String HIVE_CONFIG = "hive-config";
-
- public HqlExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- Map<String, String> configMap = getConfiguration();
- HiveClient hiveClient = new HiveClient(configMap);
-
- for (String hql: getHqls()) {
- hiveClient.executeHQL(hql);
- }
- return new ExecuteResult(ExecuteResult.State.SUCCEED);
- } catch (Exception e) {
- logger.error("error run hive query:" + getHqls(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setConfiguration(Map<String, String> configMap) {
- if(configMap != null) {
- String configStr = "";
- try {
- configStr = JsonUtil.writeValueAsString(configMap);
- } catch (JsonProcessingException e) {
- e.printStackTrace();
- }
- setParam(HIVE_CONFIG, configStr);
- }
- }
-
-
- @SuppressWarnings("unchecked")
- private Map<String, String> getConfiguration() {
- String configStr = getParam(HIVE_CONFIG);
- Map<String, String> result = null;
- if(configStr != null) {
- try {
- result = JsonUtil.readValue(configStr, HashMap.class);
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
-
- return result;
- }
-
- public void setHqls(List<String> hqls) {
- setParam(HQL, StringUtils.join(hqls, ";"));
- }
-
- private List<String> getHqls() {
- final String hqls = getParam(HQL);
- if (hqls != null) {
- return Lists.newArrayList(StringUtils.split(hqls, ";"));
- } else {
- return Collections.emptyList();
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java b/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
deleted file mode 100644
index f8eab6c..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/MapReduceExecutable.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.lang.reflect.Constructor;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.Cluster;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.constant.JobStepStatusEnum;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecutableState;
-import org.apache.kylin.job.execution.ExecuteResult;
-import org.apache.kylin.job.execution.Output;
-import org.apache.kylin.job.hadoop.AbstractHadoopJob;
-import org.apache.kylin.job.tools.HadoopStatusChecker;
-
-import com.google.common.base.Preconditions;
-
-/**
- */
-public class MapReduceExecutable extends AbstractExecutable {
-
- private static final String KEY_MR_JOB = "MR_JOB_CLASS";
- private static final String KEY_PARAMS = "MR_JOB_PARAMS";
- private static final String KEY_COUNTER_SAVEAS = "MR_COUNTER_SAVEAS";
-
- public static final String MAP_REDUCE_WAIT_TIME = "mapReduceWaitTime";
-
- public MapReduceExecutable() {
- super();
- }
-
- @Override
- protected void onExecuteStart(ExecutableContext executableContext) {
- final Output output = executableManager.getOutput(getId());
- if (output.getExtra().containsKey(START_TIME)) {
- final String mrJobId = output.getExtra().get(ExecutableConstants.MR_JOB_ID);
- if (mrJobId == null) {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- return;
- }
- try {
- Job job = new Cluster(new Configuration()).getJob(JobID.forName(mrJobId));
- if (job.getJobState() == JobStatus.State.FAILED) {
- //remove previous mr job info
- super.onExecuteStart(executableContext);
- } else {
- executableManager.updateJobOutput(getId(), ExecutableState.RUNNING, null, null);
- }
- } catch (IOException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- } catch (InterruptedException e) {
- logger.warn("error get hadoop status");
- super.onExecuteStart(executableContext);
- }
- } else {
- super.onExecuteStart(executableContext);
- }
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- final String mapReduceJobClass = getMapReduceJobClass();
- String params = getMapReduceParams();
- Preconditions.checkNotNull(mapReduceJobClass);
- Preconditions.checkNotNull(params);
- try {
- Job job;
- final Map<String, String> extra = executableManager.getOutput(getId()).getExtra();
- if (extra.containsKey(ExecutableConstants.MR_JOB_ID)) {
- job = new Cluster(new Configuration()).getJob(JobID.forName(extra.get(ExecutableConstants.MR_JOB_ID)));
- logger.info("mr_job_id:" + extra.get(ExecutableConstants.MR_JOB_ID + " resumed"));
- } else {
- final Constructor<? extends AbstractHadoopJob> constructor = ClassUtil.forName(mapReduceJobClass, AbstractHadoopJob.class).getConstructor();
- final AbstractHadoopJob hadoopJob = constructor.newInstance();
- hadoopJob.setAsync(true); // so the ToolRunner.run() returns right away
- logger.info("parameters of the MapReduceExecutable:");
- logger.info(params);
- String[] args = params.trim().split("\\s+");
- try {
- //for async mr job, ToolRunner just return 0;
- ToolRunner.run(hadoopJob, args);
- } catch (Exception ex) {
- StringBuilder log = new StringBuilder();
- logger.error("error execute " + this.toString(), ex);
- StringWriter stringWriter = new StringWriter();
- ex.printStackTrace(new PrintWriter(stringWriter));
- log.append(stringWriter.toString()).append("\n");
- log.append("result code:").append(2);
- return new ExecuteResult(ExecuteResult.State.ERROR, log.toString());
- }
- job = hadoopJob.getJob();
- }
- final StringBuilder output = new StringBuilder();
- final HadoopCmdOutput hadoopCmdOutput = new HadoopCmdOutput(job, output);
-
- final String restStatusCheckUrl = getRestStatusCheckUrl(job, context.getConfig());
- if (restStatusCheckUrl == null) {
- logger.error("restStatusCheckUrl is null");
- return new ExecuteResult(ExecuteResult.State.ERROR, "restStatusCheckUrl is null");
- }
- String mrJobId = hadoopCmdOutput.getMrJobId();
- HadoopStatusChecker statusChecker = new HadoopStatusChecker(restStatusCheckUrl, mrJobId, output);
- JobStepStatusEnum status = JobStepStatusEnum.NEW;
- while (!isDiscarded()) {
- JobStepStatusEnum newStatus = statusChecker.checkStatus();
- if (status == JobStepStatusEnum.KILLED) {
- executableManager.updateJobOutput(getId(), ExecutableState.ERROR, Collections.<String, String>emptyMap(), "killed by admin");
- return new ExecuteResult(ExecuteResult.State.FAILED, "killed by admin");
- }
- if (status == JobStepStatusEnum.WAITING && (newStatus == JobStepStatusEnum.FINISHED || newStatus == JobStepStatusEnum.ERROR || newStatus == JobStepStatusEnum.RUNNING)) {
- final long waitTime = System.currentTimeMillis() - getStartTime();
- setMapReduceWaitTime(waitTime);
- }
- status = newStatus;
- executableManager.addJobInfo(getId(), hadoopCmdOutput.getInfo());
- if (status.isComplete()) {
- final Map<String, String> info = hadoopCmdOutput.getInfo();
- readCounters(hadoopCmdOutput, info);
- executableManager.addJobInfo(getId(), info);
-
- if (status == JobStepStatusEnum.FINISHED) {
- return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString());
- } else {
- return new ExecuteResult(ExecuteResult.State.FAILED, output.toString());
- }
- }
- Thread.sleep(context.getConfig().getYarnStatusCheckIntervalSeconds() * 1000);
- }
- //TODO kill discarded mr job using "hadoop job -kill " + mrJobId
-
- return new ExecuteResult(ExecuteResult.State.DISCARDED, output.toString());
-
- } catch (ReflectiveOperationException e) {
- logger.error("error getMapReduceJobClass, class name:" + getParam(KEY_MR_JOB), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- } catch (Exception e) {
- logger.error("error execute " + this.toString(), e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- private void readCounters(final HadoopCmdOutput hadoopCmdOutput, final Map<String, String> info) {
- hadoopCmdOutput.updateJobCounter();
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, hadoopCmdOutput.getMapInputRecords());
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, hadoopCmdOutput.getHdfsBytesRead());
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hadoopCmdOutput.getHdfsBytesWritten());
-
- String saveAs = getParam(KEY_COUNTER_SAVEAS);
- if (saveAs != null) {
- String[] saveAsNames = saveAs.split(",");
- saveCounterAs(hadoopCmdOutput.getMapInputRecords(), saveAsNames, 0, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesRead(), saveAsNames, 1, info);
- saveCounterAs(hadoopCmdOutput.getHdfsBytesWritten(), saveAsNames, 2, info);
- }
- }
-
- private void saveCounterAs(String counter, String[] saveAsNames, int i, Map<String, String> info) {
- if (saveAsNames.length > i && StringUtils.isBlank(saveAsNames[i]) == false) {
- info.put(saveAsNames[i].trim(), counter);
- }
- }
-
- private String getRestStatusCheckUrl(Job job, KylinConfig config) {
- final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
- if (yarnStatusCheckUrl != null) {
- return yarnStatusCheckUrl;
- } else {
- logger.info(KylinConfig.KYLIN_JOB_YARN_APP_REST_CHECK_URL + " is not set, read from job configuration");
- }
- String rmWebHost = job.getConfiguration().get("yarn.resourcemanager.webapp.address");
- if (StringUtils.isEmpty(rmWebHost)) {
- return null;
- }
- if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) {
- //do nothing
- } else {
- rmWebHost = "http://" + rmWebHost;
- }
- logger.info("yarn.resourcemanager.webapp.address:" + rmWebHost);
- return rmWebHost + "/ws/v1/cluster/apps/${job_id}?anonymous=true";
- }
-
- public long getMapReduceWaitTime() {
- return getExtraInfoAsLong(MAP_REDUCE_WAIT_TIME, 0L);
- }
-
- public void setMapReduceWaitTime(long t) {
- addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
- }
-
- public void setMapReduceJobClass(Class<? extends AbstractHadoopJob> clazzName) {
- setParam(KEY_MR_JOB, clazzName.getName());
- }
-
- public String getMapReduceJobClass() throws ExecuteException {
- return getParam(KEY_MR_JOB);
- }
-
- public void setMapReduceParams(String param) {
- setParam(KEY_PARAMS, param);
- }
-
- public String getMapReduceParams() {
- return getParam(KEY_PARAMS);
- }
-
- public String getCounterSaveAs() {
- return getParam(KEY_COUNTER_SAVEAS);
- }
-
- public void setCounterSaveAs(String value) {
- setParam(KEY_COUNTER_SAVEAS, value);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
deleted file mode 100644
index 786698e..0000000
--- a/job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.common;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.kylin.common.util.Pair;
-
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.util.Logger;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.exception.ExecuteException;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.execution.ExecutableContext;
-import org.apache.kylin.job.execution.ExecuteResult;
-
-/**
- */
-public class ShellExecutable extends AbstractExecutable {
-
- private static final String CMD = "cmd";
-
- public ShellExecutable() {
- super();
- }
-
- @Override
- protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException {
- try {
- logger.info("executing:" + getCmd());
- final ShellExecutableLogger logger = new ShellExecutableLogger();
- final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger);
- executableManager.addJobInfo(getId(), logger.getInfo());
- return new ExecuteResult(result.getFirst() == 0? ExecuteResult.State.SUCCEED: ExecuteResult.State.FAILED, result.getSecond());
- } catch (IOException e) {
- logger.error("job:" + getId() + " execute finished with exception", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
- }
- }
-
- public void setCmd(String cmd) {
- setParam(CMD, cmd);
- }
-
- public String getCmd() {
- return getParam(CMD);
- }
-
- private static class ShellExecutableLogger implements Logger {
-
- private final Map<String, String> info = Maps.newHashMap();
-
- private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager");
- private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)");
- private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)");
- private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)");
- private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write");
-
- // hive
- private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)");
- private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS");
-
- @Override
- public void log(String message) {
- Matcher matcher = PATTERN_APP_ID.matcher(message);
- if (matcher.find()) {
- String appId = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_ID, appId);
- }
-
- matcher = PATTERN_APP_URL.matcher(message);
- if (matcher.find()) {
- String appTrackingUrl = matcher.group(1);
- info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl);
- }
-
- matcher = PATTERN_JOB_ID.matcher(message);
- if (matcher.find()) {
- String mrJobID = matcher.group(1);
- info.put(ExecutableConstants.MR_JOB_ID, mrJobID);
- }
-
- matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- String hdfsWritten = matcher.group(1);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message);
- if (matcher.find()) {
- String sourceCount = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount);
- }
-
- matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message);
- if (matcher.find()) {
- String sourceSize = matcher.group(1);
- info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize);
- }
-
- // hive
- matcher = PATTERN_HIVE_APP_ID_URL.matcher(message);
- if (matcher.find()) {
- String jobId = matcher.group(1);
- String trackingUrl = matcher.group(2);
- info.put(ExecutableConstants.MR_JOB_ID, jobId);
- info.put(ExecutableConstants.YARN_APP_URL, trackingUrl);
- }
-
- matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message);
- if (matcher.find()) {
- // String hdfsRead = matcher.group(1);
- String hdfsWritten = matcher.group(2);
- info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten);
- }
- }
-
- Map<String, String> getInfo() {
- return info;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java b/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
deleted file mode 100644
index 3a64d02..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/BatchConstants.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public interface BatchConstants {
-
- public static final char INTERMEDIATE_TABLE_ROW_DELIMITER = 127;
-
- public static final String CFG_CUBE_NAME = "cube.name";
- public static final String CFG_CUBE_SEGMENT_NAME = "cube.segment.name";
-
- public static final String CFG_II_NAME = "ii.name";
- public static final String CFG_II_SEGMENT_NAME = "ii.segment.name";
-
- public static final String INPUT_DELIM = "input.delim";
- public static final String OUTPUT_PATH = "output.path";
-
- public static final String TABLE_NAME = "table.name";
- public static final String TABLE_COLUMNS = "table.columns";
-
- public static final String CFG_IS_MERGE = "is.merge";
- public static final String CFG_CUBE_INTERMEDIATE_TABLE_ROW_DELIMITER = "cube.intermediate.table.row.delimiter";
-
- public static final String MAPREDUCE_COUTNER_GROUP_NAME = "Cube Builder";
-
- public static final String MAPPER_SAMPLE_NUMBER = "mapper.sample.number";
- public static final String REGION_NUMBER = "region.number";
- public static final String CUBE_CAPACITY = "cube.capacity";
-
- public static final String CFG_STATISTICS_ENABLED = "statistics.enabled";
- public static final String CFG_STATISTICS_OUTPUT = "statistics.ouput";
- public static final String CFG_STATISTICS_SAMPLING_PERCENT = "statistics.sampling.percent";
- public static final String CFG_STATISTICS_CUBE_ESTIMATION = "cube_statistics.txt";
- public static final String CFG_STATISTICS_CUBOID_ESTIMATION = "cuboid_statistics.seq";
-
- public static final int COUNTER_MAX = 100000;
- public static final int ERROR_RECORD_THRESHOLD = 100;
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
deleted file mode 100644
index fdcfdbe..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- */
-public final class ExecutableConstants {
-
- private ExecutableConstants(){}
-
- public static final String YARN_APP_ID = "yarn_application_id";
-
- public static final String YARN_APP_URL = "yarn_application_tracking_url";
- public static final String MR_JOB_ID = "mr_job_id";
- public static final String HDFS_BYTES_WRITTEN = "hdfs_bytes_written";
- public static final String SOURCE_RECORDS_COUNT = "source_records_count";
- public static final String SOURCE_RECORDS_SIZE = "source_records_size";
- public static final String GLOBAL_LISTENER_NAME = "ChainListener";
-
-
-
-
- public static final int DEFAULT_SCHEDULER_INTERVAL_SECONDS = 60;
-
- public static final String CUBE_JOB_GROUP_NAME = "cube_job_group";
-
- public static final String DAEMON_JOB_GROUP_NAME = "daemon_job_group";
- public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension Dictionary";
-
- public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create Intermediate Flat Hive Table";
- public static final String STEP_NAME_FACT_DISTINCT_COLUMNS = "Extract Fact Table Distinct Columns";
- public static final String STEP_NAME_BUILD_BASE_CUBOID = "Build Base Cuboid Data";
- public static final String STEP_NAME_BUILD_IN_MEM_CUBE = "Build Cube";
- public static final String STEP_NAME_BUILD_N_D_CUBOID = "Build N-Dimension Cuboid Data";
- public static final String STEP_NAME_GET_CUBOID_KEY_DISTRIBUTION = "Calculate HTable Region Splits";
- public static final String STEP_NAME_CREATE_HBASE_TABLE = "Create HTable";
- public static final String STEP_NAME_CONVERT_CUBOID_TO_HFILE = "Convert Cuboid Data to HFile";
- public static final String STEP_NAME_BULK_LOAD_HFILE = "Load HFile to HBase Table";
- public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid Dictionary";
- public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid Statistics";
- public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid Statistics";
- public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
- public static final String STEP_NAME_UPDATE_CUBE_INFO = "Update Cube Info";
- public static final String STEP_NAME_GARBAGE_COLLECTION = "Garbage Collection";
-
- public static final String STEP_NAME_BUILD_II = "Build Inverted Index";
- public static final String STEP_NAME_CONVERT_II_TO_HFILE = "Convert Inverted Index Data to HFile";
-
- public static final String PROP_ENGINE_CONTEXT = "jobengineConfig";
- public static final String PROP_JOB_FLOW = "jobFlow";
- public static final String PROP_JOBINSTANCE_UUID = "jobInstanceUuid";
- public static final String PROP_JOBSTEP_SEQ_ID = "jobStepSequenceID";
- public static final String PROP_COMMAND = "command";
- // public static final String PROP_STORAGE_LOCATION =
- // "storageLocationIdentifier";
- public static final String PROP_JOB_ASYNC = "jobAsync";
- public static final String PROP_JOB_CMD_EXECUTOR = "jobCmdExecutor";
- public static final String PROP_JOB_CMD_OUTPUT = "jobCmdOutput";
- public static final String PROP_JOB_KILLED = "jobKilled";
- public static final String PROP_JOB_RUNTIME_FLOWS = "jobFlows";
-
- public static final String NOTIFY_EMAIL_TEMPLATE = "<div><b>Build Result of Job ${job_name}</b><pre><ul>" + "<li>Build Result: <b>${result}</b></li>" + "<li>Job Engine: ${job_engine}</li>" + "<li>Cube Name: ${cube_name}</li>" + "<li>Start Time: ${start_time}</li>" + "<li>Duration: ${duration}</li>" + "<li>MR Waiting: ${mr_waiting}</li>" + "<li>Last Update Time: ${last_update_time}</li>" + "<li>Submitter: ${submitter}</li>" + "<li>Error Log: ${error_log}</li>" + "</ul></pre><div/>";
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
deleted file mode 100644
index a4ef564..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStatusEnum.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStatusEnum {
-
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16);
-
- private final int code;
-
- private JobStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStatusEnum getByCode(int statusCode) {
- for (JobStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStatusEnum.FINISHED.getCode() || code == JobStatusEnum.ERROR.getCode() || code == JobStatusEnum.DISCARDED.getCode();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
deleted file mode 100644
index 02b40a3..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepCmdTypeEnum.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-/**
- * @author xduo, ysong1
- *
- */
-public enum JobStepCmdTypeEnum {
- SHELL_CMD, SHELL_CMD_HADOOP, JAVA_CMD_HADOOP_FACTDISTINCT, JAVA_CMD_HADOOP_BASECUBOID, JAVA_CMD_HADOOP_NDCUBOID, JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, JAVA_CMD_HADOOP_CONVERTHFILE, JAVA_CMD_HADOOP_MERGECUBOID, JAVA_CMD_HADOOP_NO_MR_DICTIONARY, JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, JAVA_CMD_HADOOP_NO_MR_BULKLOAD
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java b/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
deleted file mode 100644
index 08ee79a..0000000
--- a/job/src/main/java/org/apache/kylin/job/constant/JobStepStatusEnum.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.constant;
-
-public enum JobStepStatusEnum {
- NEW(0), PENDING(1), RUNNING(2), FINISHED(4), ERROR(8), DISCARDED(16), WAITING(32), KILLED(64);
-
- private final int code;
-
- private JobStepStatusEnum(int statusCode) {
- this.code = statusCode;
- }
-
- public static JobStepStatusEnum getByCode(int statusCode) {
- for (JobStepStatusEnum status : values()) {
- if (status.getCode() == statusCode) {
- return status;
- }
- }
-
- return null;
- }
-
- public int getCode() {
- return this.code;
- }
-
- public boolean isComplete() {
- return code == JobStepStatusEnum.FINISHED.getCode() || code == JobStepStatusEnum.ERROR.getCode() || code == JobStepStatusEnum.DISCARDED.getCode();
- }
-
- public boolean isRunable() {
- return code == JobStepStatusEnum.PENDING.getCode() || code == JobStepStatusEnum.ERROR.getCode();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
deleted file mode 100644
index 4862bb1..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableDao.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.io.IOUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.persistence.JsonSerializer;
-import org.apache.kylin.common.persistence.RawResource;
-import org.apache.kylin.common.persistence.ResourceStore;
-import org.apache.kylin.common.persistence.Serializer;
-import org.apache.kylin.job.exception.PersistentException;
-import org.apache.kylin.metadata.MetadataManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- */
-public class ExecutableDao {
-
- private static final Serializer<ExecutablePO> JOB_SERIALIZER = new JsonSerializer<ExecutablePO>(ExecutablePO.class);
- private static final Serializer<ExecutableOutputPO> JOB_OUTPUT_SERIALIZER = new JsonSerializer<ExecutableOutputPO>(ExecutableOutputPO.class);
- private static final Logger logger = LoggerFactory.getLogger(ExecutableDao.class);
- private static final ConcurrentHashMap<KylinConfig, ExecutableDao> CACHE = new ConcurrentHashMap<KylinConfig, ExecutableDao>();
- public static final String JOB_PATH_ROOT = "/execute";
- public static final String JOB_OUTPUT_ROOT = "/execute_output";
-
- private ResourceStore store;
-
- public static ExecutableDao getInstance(KylinConfig config) {
- ExecutableDao r = CACHE.get(config);
- if (r == null) {
- r = new ExecutableDao(config);
- CACHE.put(config, r);
- if (CACHE.size() > 1) {
- logger.warn("More than one singleton exist");
- }
-
- }
- return r;
- }
-
- private ExecutableDao(KylinConfig config) {
- logger.info("Using metadata url: " + config);
- this.store = MetadataManager.getInstance(config).getStore();
- }
-
- private String pathOfJob(ExecutablePO job) {
- return pathOfJob(job.getUuid());
- }
- private String pathOfJob(String uuid) {
- return JOB_PATH_ROOT + "/" + uuid;
- }
-
- private String pathOfJobOutput(String uuid) {
- return JOB_OUTPUT_ROOT + "/" + uuid;
- }
-
- private ExecutablePO readJobResource(String path) throws IOException {
- return store.getResource(path, ExecutablePO.class, JOB_SERIALIZER);
- }
-
- private void writeJobResource(String path, ExecutablePO job) throws IOException {
- store.putResource(path, job, JOB_SERIALIZER);
- }
-
- private ExecutableOutputPO readJobOutputResource(String path) throws IOException {
- return store.getResource(path, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
- }
-
- private long writeJobOutputResource(String path, ExecutableOutputPO output) throws IOException {
- return store.putResource(path, output, JOB_OUTPUT_SERIALIZER);
- }
-
- public List<ExecutableOutputPO> getJobOutputs() throws PersistentException {
- try {
- ArrayList<String> resources = store.listResources(JOB_OUTPUT_ROOT);
- if (resources == null || resources.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.sort(resources);
- String rangeStart = resources.get(0);
- String rangeEnd = resources.get(resources.size() - 1);
- return store.getAllResources(rangeStart, rangeEnd, ExecutableOutputPO.class, JOB_OUTPUT_SERIALIZER);
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public List<ExecutablePO> getJobs() throws PersistentException {
- try {
- final List<String> jobIds = store.listResources(JOB_PATH_ROOT);
- if (jobIds == null || jobIds.isEmpty()) {
- return Collections.emptyList();
- }
- Collections.sort(jobIds);
- String rangeStart = jobIds.get(0);
- String rangeEnd = jobIds.get(jobIds.size() - 1);
- return store.getAllResources(rangeStart, rangeEnd, ExecutablePO.class, JOB_SERIALIZER);
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public List<String> getJobIds() throws PersistentException {
- try {
- ArrayList<String> resources = store.listResources(JOB_PATH_ROOT);
- if (resources == null) {
- return Collections.emptyList();
- }
- ArrayList<String> result = Lists.newArrayListWithExpectedSize(resources.size());
- for (String path : resources) {
- result.add(path.substring(path.lastIndexOf("/") + 1));
- }
- return result;
- } catch (IOException e) {
- logger.error("error get all Jobs:", e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutablePO getJob(String uuid) throws PersistentException {
- try {
- return readJobResource(pathOfJob(uuid));
- } catch (IOException e) {
- logger.error("error get job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutablePO addJob(ExecutablePO job) throws PersistentException {
- try {
- if (getJob(job.getUuid()) != null) {
- throw new IllegalArgumentException("job id:" + job.getUuid() + " already exists");
- }
- writeJobResource(pathOfJob(job), job);
- return job;
- } catch (IOException e) {
- logger.error("error save job:" + job.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void deleteJob(String uuid) throws PersistentException {
- try {
- store.deleteResource(pathOfJob(uuid));
- } catch (IOException e) {
- logger.error("error delete job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public ExecutableOutputPO getJobOutput(String uuid) throws PersistentException {
- try {
- ExecutableOutputPO result = readJobOutputResource(pathOfJobOutput(uuid));
- if (result == null) {
- result = new ExecutableOutputPO();
- result.setUuid(uuid);
- return result;
- }
- return result;
- } catch (IOException e) {
- logger.error("error get job output id:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-
- public void addJobOutput(ExecutableOutputPO output) throws PersistentException {
- try {
- output.setLastModified(0);
- writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
- } catch (IOException e) {
- logger.error("error update job output id:" + output.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void updateJobOutput(ExecutableOutputPO output) throws PersistentException {
- try {
- final long ts = writeJobOutputResource(pathOfJobOutput(output.getUuid()), output);
- output.setLastModified(ts);
- } catch (IOException e) {
- logger.error("error update job output id:" + output.getUuid(), e);
- throw new PersistentException(e);
- }
- }
-
- public void deleteJobOutput(String uuid) throws PersistentException {
- try {
- store.deleteResource(pathOfJobOutput(uuid));
- } catch (IOException e) {
- logger.error("error delete job:" + uuid, e);
- throw new PersistentException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
deleted file mode 100644
index 4dacd8a..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutableOutputPO.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutableOutputPO extends RootPersistentEntity {
-
- @JsonProperty("content")
- private String content;
-
- @JsonProperty("status")
- private String status = "READY";
-
- @JsonProperty("info")
- private Map<String, String> info = Maps.newHashMap();
-
- public String getContent() {
- return content;
- }
-
- public void setContent(String content) {
- this.content = content;
- }
-
- public String getStatus() {
- return status;
- }
-
- public void setStatus(String status) {
- this.status = status;
- }
-
- public Map<String, String> getInfo() {
- return info;
- }
-
- public void setInfo(Map<String, String> info) {
- this.info = info;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java b/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
deleted file mode 100644
index 6a17b29..0000000
--- a/job/src/main/java/org/apache/kylin/job/dao/ExecutablePO.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.dao;
-
-import java.util.List;
-import java.util.Map;
-
-import com.fasterxml.jackson.annotation.JsonAutoDetect;
-import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.collect.Maps;
-import org.apache.kylin.common.persistence.RootPersistentEntity;
-
-/**
- */
-@JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE)
-public class ExecutablePO extends RootPersistentEntity {
-
- @JsonProperty("name")
- private String name;
-
- @JsonProperty("tasks")
- private List<ExecutablePO> tasks;
-
- @JsonProperty("type")
- private String type;
-
- @JsonProperty("params")
- private Map<String, String> params = Maps.newHashMap();
-
- public String getName() {
- return name;
- }
-
- public void setName(String name) {
- this.name = name;
- }
-
- public List<ExecutablePO> getTasks() {
- return tasks;
- }
-
- public void setTasks(List<ExecutablePO> tasks) {
- this.tasks = tasks;
- }
-
- public String getType() {
- return type;
- }
-
- public void setType(String type) {
- this.type = type;
- }
-
- public Map<String, String> getParams() {
- return params;
- }
-
- public void setParams(Map<String, String> params) {
- this.params = params;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
index 1aeb50f..b03cb5f 100644
--- a/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
+++ b/job/src/main/java/org/apache/kylin/job/deployment/HbaseConfigPrinterCLI.java
@@ -27,8 +27,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
-
-import org.apache.kylin.job.tools.LZOSupportnessChecker;
+import org.apache.kylin.storage.hbase.util.LZOSupportnessChecker;
/**
* <p/>
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java b/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
deleted file mode 100644
index 2eb9b31..0000000
--- a/job/src/main/java/org/apache/kylin/job/engine/JobEngineConfig.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.engine;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.tools.OptionsHelper;
-import org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.*;
-
-/**
- * @author ysong1
- */
-public class JobEngineConfig {
- private static final Logger logger = LoggerFactory.getLogger(JobEngineConfig.class);
- public static String HADOOP_JOB_CONF_FILENAME = "kylin_job_conf";
- public static String HIVE_CONF_FILENAME = "kylin_hive_conf";
-
- private static File getJobConfig(String fileName) {
- String path = System.getProperty(KylinConfig.KYLIN_CONF);
- if (StringUtils.isNotEmpty(path)) {
- return new File(path, fileName);
- }
-
- path = KylinConfig.getKylinHome();
- if (StringUtils.isNotEmpty(path)) {
- return new File(path + File.separator + "conf", fileName);
- }
- return null;
- }
-
- private String getHadoopJobConfFilePath(RealizationCapacity capaticy, boolean appendSuffix) throws IOException {
- String hadoopJobConfFile;
- if (appendSuffix) {
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + "_" + capaticy.toString().toLowerCase() + ".xml");
- } else {
- hadoopJobConfFile = (HADOOP_JOB_CONF_FILENAME + ".xml");
- }
-
- File jobConfig = getJobConfig(hadoopJobConfFile);
- if (jobConfig == null || !jobConfig.exists()) {
- logger.warn("fail to locate " + hadoopJobConfFile + ", trying to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- jobConfig = getJobConfig(HADOOP_JOB_CONF_FILENAME + ".xml");
- if (jobConfig == null || !jobConfig.exists()) {
- logger.error("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- throw new RuntimeException("fail to locate " + HADOOP_JOB_CONF_FILENAME + ".xml");
- }
- }
- return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
- }
-
- public String getHadoopJobConfFilePath(RealizationCapacity capaticy) throws IOException {
- String path = getHadoopJobConfFilePath(capaticy, true);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- } else {
- path = getHadoopJobConfFilePath(capaticy, false);
- if (!StringUtils.isEmpty(path)) {
- logger.info("Chosen job conf is : " + path);
- return path;
- }
- }
- return "";
- }
-
-
- public String getHiveConfFilePath() throws IOException {
- String hiveConfFile = (HIVE_CONF_FILENAME + ".xml");
-
- File jobConfig = getJobConfig(hiveConfFile);
- if (jobConfig == null || !jobConfig.exists()) {
-
- logger.error("fail to locate " + HIVE_CONF_FILENAME + ".xml");
- throw new RuntimeException("fail to locate " + HIVE_CONF_FILENAME + ".xml");
- }
- return OptionsHelper.convertToFileURL(jobConfig.getAbsolutePath());
- }
-
- // there should be no setters
- private final KylinConfig config;
-
- public JobEngineConfig(KylinConfig config) {
- this.config = config;
- }
-
- public KylinConfig getConfig() {
- return config;
- }
-
- public String getHdfsWorkingDirectory() {
- return config.getHdfsWorkingDirectory();
- }
-
- /**
- * @return the maxConcurrentJobLimit
- */
- public int getMaxConcurrentJobLimit() {
- return config.getMaxConcurrentJobLimit();
- }
-
- /**
- * @return the timeZone
- */
- public String getTimeZone() {
- return config.getTimeZone();
- }
-
- /**
- * @return the adminDls
- */
- public String getAdminDls() {
- return config.getAdminDls();
- }
-
- /**
- * @return the jobStepTimeout
- */
- public long getJobStepTimeout() {
- return config.getJobStepTimeout();
- }
-
- /**
- * @return the asyncJobCheckInterval
- */
- public int getAsyncJobCheckInterval() {
- return config.getYarnStatusCheckIntervalSeconds();
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#hashCode()
- */
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((config == null) ? 0 : config.hashCode());
- return result;
- }
-
- /*
- * (non-Javadoc)
- *
- * @see java.lang.Object#equals(java.lang.Object)
- */
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- JobEngineConfig other = (JobEngineConfig) obj;
- if (config == null) {
- if (other.config != null)
- return false;
- } else if (!config.equals(other.config))
- return false;
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java b/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
deleted file mode 100644
index 8544fff..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/ExecuteException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class ExecuteException extends Exception {
-
- private static final long serialVersionUID = 5677121412192984281L;
-
- public ExecuteException() {
- }
-
- public ExecuteException(String message) {
- super(message);
- }
-
- public ExecuteException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public ExecuteException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java b/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
deleted file mode 100644
index f19b0ca..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/IllegalStateTranferException.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class IllegalStateTranferException extends RuntimeException {
-
- private static final long serialVersionUID = 8466551519300132702L;
-
- public IllegalStateTranferException() {
- }
-
- public IllegalStateTranferException(String message) {
- super(message);
- }
-
- public IllegalStateTranferException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public IllegalStateTranferException(Throwable cause) {
- super(cause);
- }
-
- public IllegalStateTranferException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/JobException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/JobException.java b/job/src/main/java/org/apache/kylin/job/exception/JobException.java
deleted file mode 100644
index ba4c52a..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/JobException.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- * @author xduo
- *
- */
-public class JobException extends Exception {
-
- private static final long serialVersionUID = 1L;
-
- /**
- *
- */
- public JobException() {
- super();
- }
-
- /**
- * @param message
- * @param cause
- */
- public JobException(String message, Throwable cause) {
- super(message, cause);
- }
-
- /**
- * @param message
- */
- public JobException(String message) {
- super(message);
- }
-
- /**
- * @param cause
- */
- public JobException(Throwable cause) {
- super(cause);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/7663fff4/job/src/main/java/org/apache/kylin/job/exception/LockException.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/exception/LockException.java b/job/src/main/java/org/apache/kylin/job/exception/LockException.java
deleted file mode 100644
index cf43ac9..0000000
--- a/job/src/main/java/org/apache/kylin/job/exception/LockException.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.job.exception;
-
-/**
- */
-public class LockException extends Exception {
- private static final long serialVersionUID = 2072745879281754945L;
-
- public LockException() {
- }
-
- public LockException(String message) {
- super(message);
- }
-
- public LockException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public LockException(Throwable cause) {
- super(cause);
- }
-
- public LockException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
- super(message, cause, enableSuppression, writableStackTrace);
- }
-}