You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/02/12 06:22:37 UTC
[17/97] [abbrv] [partial] incubator-kylin git commit: cleanup for
migration from github.com
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/DeployCoprocessorCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/DeployCoprocessorCLI.java b/job/src/main/java/com/kylinolap/job/tools/DeployCoprocessorCLI.java
deleted file mode 100644
index 5721d67..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/DeployCoprocessorCLI.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.tools;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Matcher;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.HadoopUtil;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-
-/**
- * @author yangli9
- */
-public class DeployCoprocessorCLI {
-
- private static final Logger logger = LoggerFactory.getLogger(DeployCoprocessorCLI.class);
-
- public static final String AGGR_COPROCESSOR_CLS_NAME = "com.kylinolap.storage.hbase.observer.AggregateRegionObserver";
-
- public static void main(String[] args) throws IOException {
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- Configuration hconf = HadoopUtil.newHBaseConfiguration(kylinConfig.getStorageUrl());
- FileSystem fileSystem = FileSystem.get(hconf);
- HBaseAdmin hbaseAdmin = new HBaseAdmin(hconf);
-
- String localCoprocessorJar = new File(args[0]).getAbsolutePath();
- logger.info("Identify coprocessor jar " + localCoprocessorJar);
-
- List<String> tableNames = getHTableNames(kylinConfig);
- logger.info("Identify tables " + tableNames);
-
- Set<String> oldJarPaths = getCoprocessorJarPaths(hbaseAdmin, tableNames);
- logger.info("Old coprocessor jar: " + oldJarPaths);
-
- Path hdfsCoprocessorJar = uploadCoprocessorJar(localCoprocessorJar, fileSystem, oldJarPaths);
- logger.info("New coprocessor jar: " + hdfsCoprocessorJar);
-
- List<String> processedTables = resetCoprocessorOnHTables(hbaseAdmin, hdfsCoprocessorJar, tableNames);
-
- // Don't remove old jars, missing coprocessor jar will fail hbase
- // removeOldJars(oldJarPaths, fileSystem);
-
- hbaseAdmin.close();
-
- logger.info("Processed " + processedTables);
- logger.info("Active coprocessor jar: " + hdfsCoprocessorJar);
- }
-
- public static void setCoprocessorOnHTable(HTableDescriptor desc, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Set coprocessor on " + desc.getNameAsString());
- desc.addCoprocessor(AGGR_COPROCESSOR_CLS_NAME, hdfsCoprocessorJar, 1001, null);
- }
-
- public static void resetCoprocessor(String tableName, HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar) throws IOException {
- logger.info("Disable " + tableName);
- hbaseAdmin.disableTable(tableName);
-
- logger.info("Unset coprocessor on " + tableName);
- HTableDescriptor desc = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- while (desc.hasCoprocessor(AGGR_COPROCESSOR_CLS_NAME)) {
- desc.removeCoprocessor(AGGR_COPROCESSOR_CLS_NAME);
- }
-
- setCoprocessorOnHTable(desc, hdfsCoprocessorJar);
- hbaseAdmin.modifyTable(tableName, desc);
-
- logger.info("Enable " + tableName);
- hbaseAdmin.enableTable(tableName);
- }
-
- private static List<String> resetCoprocessorOnHTables(HBaseAdmin hbaseAdmin, Path hdfsCoprocessorJar, List<String> tableNames) throws IOException {
- List<String> processed = new ArrayList<String>();
-
- for (String tableName : tableNames) {
- try {
- resetCoprocessor(tableName, hbaseAdmin, hdfsCoprocessorJar);
- processed.add(tableName);
- } catch (IOException ex) {
- logger.error("Error processing " + tableName, ex);
- }
- }
- return processed;
- }
-
- public static Path getNewestCoprocessorJar(KylinConfig config, FileSystem fileSystem) throws IOException {
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, config);
- FileStatus newestJar = null;
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getPath().toString().endsWith(".jar")) {
- if (newestJar == null) {
- newestJar = fileStatus;
- } else {
- if (newestJar.getModificationTime() < fileStatus.getModificationTime())
- newestJar = fileStatus;
- }
- }
- }
- if (newestJar == null)
- return null;
-
- Path path = newestJar.getPath().makeQualified(fileSystem.getUri(), null);
- logger.info("The newest coprocessor is " + path.toString());
- return path;
- }
-
- public static Path uploadCoprocessorJar(String localCoprocessorJar, FileSystem fileSystem, Set<String> oldJarPaths) throws IOException {
- Path uploadPath = null;
- File localCoprocessorFile = new File(localCoprocessorJar);
-
- // check existing jars
- if (oldJarPaths == null) {
- oldJarPaths = new HashSet<String>();
- }
- Path coprocessorDir = getCoprocessorHDFSDir(fileSystem, KylinConfig.getInstanceFromEnv());
- for (FileStatus fileStatus : fileSystem.listStatus(coprocessorDir)) {
- if (fileStatus.getLen() == localCoprocessorJar.length() && fileStatus.getModificationTime() == localCoprocessorFile.lastModified()) {
- uploadPath = fileStatus.getPath();
- break;
- }
- String filename = fileStatus.getPath().toString();
- if (filename.endsWith(".jar")) {
- oldJarPaths.add(filename);
- }
- }
-
- // upload if not existing
- if (uploadPath == null) {
- // figure out a unique new jar file name
- Set<String> oldJarNames = new HashSet<String>();
- for (String path : oldJarPaths) {
- oldJarNames.add(new Path(path).getName());
- }
- String baseName = getBaseFileName(localCoprocessorJar);
- String newName = null;
- int i = 0;
- while (newName == null) {
- newName = baseName + "-" + (i++) + ".jar";
- if (oldJarNames.contains(newName))
- newName = null;
- }
-
- // upload
- uploadPath = new Path(coprocessorDir, newName);
- FileInputStream in = null;
- FSDataOutputStream out = null;
- try {
- in = new FileInputStream(localCoprocessorFile);
- out = fileSystem.create(uploadPath);
- IOUtils.copy(in, out);
- } finally {
- IOUtils.closeQuietly(in);
- IOUtils.closeQuietly(out);
- }
-
- fileSystem.setTimes(uploadPath, localCoprocessorFile.lastModified(), System.currentTimeMillis());
-
- }
-
- uploadPath = uploadPath.makeQualified(fileSystem.getUri(), null);
- return uploadPath;
- }
-
- private static String getBaseFileName(String localCoprocessorJar) {
- File localJar = new File(localCoprocessorJar);
- String baseName = localJar.getName();
- if (baseName.endsWith(".jar"))
- baseName = baseName.substring(0, baseName.length() - ".jar".length());
- return baseName;
- }
-
- private static Path getCoprocessorHDFSDir(FileSystem fileSystem, KylinConfig config) throws IOException {
- String hdfsWorkingDirectory = config.getHdfsWorkingDirectory();
- Path coprocessorDir = new Path(hdfsWorkingDirectory, "coprocessor");
- fileSystem.mkdirs(coprocessorDir);
- return coprocessorDir;
- }
-
- private static Set<String> getCoprocessorJarPaths(HBaseAdmin hbaseAdmin, List<String> tableNames) throws IOException {
- HashSet<String> result = new HashSet<String>();
-
- for (String tableName : tableNames) {
- HTableDescriptor tableDescriptor = null;
- try {
- tableDescriptor = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
- } catch (TableNotFoundException e) {
- logger.warn("Table not found " + tableName, e);
- continue;
- }
-
- Matcher keyMatcher;
- Matcher valueMatcher;
- for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> e : tableDescriptor.getValues().entrySet()) {
- keyMatcher = HConstants.CP_HTD_ATTR_KEY_PATTERN.matcher(Bytes.toString(e.getKey().get()));
- if (!keyMatcher.matches()) {
- continue;
- }
- valueMatcher = HConstants.CP_HTD_ATTR_VALUE_PATTERN.matcher(Bytes.toString(e.getValue().get()));
- if (!valueMatcher.matches()) {
- continue;
- }
-
- String jarPath = valueMatcher.group(1).trim();
- String clsName = valueMatcher.group(2).trim();
-
- if (AGGR_COPROCESSOR_CLS_NAME.equals(clsName)) {
- result.add(jarPath);
- }
- }
- }
-
- return result;
- }
-
- private static List<String> getHTableNames(KylinConfig config) {
- CubeManager cubeMgr = CubeManager.getInstance(config);
-
- ArrayList<String> result = new ArrayList<String>();
- for (CubeInstance cube : cubeMgr.listAllCubes()) {
- for (CubeSegment seg : cube.getSegments(CubeSegmentStatusEnum.READY)) {
- String tableName = seg.getStorageLocationIdentifier();
- if (StringUtils.isBlank(tableName) == false)
- result.add(tableName);
- }
- }
-
- return result;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java b/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java
deleted file mode 100644
index c11698a..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/HadoopStatusChecker.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.tools;
-
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-
-import org.apache.commons.httpclient.Header;
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.HttpMethod;
-import org.apache.commons.httpclient.methods.GetMethod;
-import org.apache.commons.httpclient.protocol.Protocol;
-import org.apache.commons.httpclient.protocol.ProtocolSocketFactory;
-import org.apache.hadoop.mapreduce.JobStatus;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
-import org.codehaus.jackson.JsonNode;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.job.constant.JobStepStatusEnum;
-
-/**
- * @author xduo
- *
- */
-public class HadoopStatusChecker {
-
- protected static final Logger log = LoggerFactory.getLogger(HadoopStatusChecker.class);
-
- private final String yarnUrl;
- private final String mrJobID;
- private final StringBuilder output;
-
- public HadoopStatusChecker(String yarnUrl, String mrJobID, StringBuilder output) {
- super();
- this.yarnUrl = yarnUrl;
- this.mrJobID = mrJobID;
- this.output = output;
- }
-
- public JobStepStatusEnum checkStatus() {
- if (null == mrJobID) {
- this.output.append("Skip status check with empty job id..\n");
- return JobStepStatusEnum.WAITING;
- }
-
- String applicationId = mrJobID.replace("job", "application");
- String url = yarnUrl.replace("${job_id}", applicationId);
- JobStepStatusEnum status = null;
- String checkResponse = null;
- try {
- checkResponse = getHttpResponse(url);
- JsonNode root = new ObjectMapper().readTree(checkResponse);
- RMAppState state = RMAppState.valueOf(root.findValue("state").getTextValue());
- FinalApplicationStatus finalStatus = FinalApplicationStatus.valueOf(root.findValue("finalStatus").getTextValue());
-
- log.debug("State of Hadoop job: " + mrJobID + ":" + state + "-" + finalStatus);
- output.append(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.S").format(new Date()) + " - State of Hadoop job: " + mrJobID + ":" + state + " - " + finalStatus + "\n");
-
- switch (finalStatus) {
- case SUCCEEDED:
- status = JobStepStatusEnum.FINISHED;
- break;
- case FAILED:
- status = JobStepStatusEnum.ERROR;
- break;
- case KILLED:
- status = JobStepStatusEnum.ERROR;
- break;
- case UNDEFINED:
- switch (state) {
- case NEW:
- case NEW_SAVING:
- case SUBMITTED:
- case ACCEPTED:
- status = JobStepStatusEnum.WAITING;
- break;
- case RUNNING:
- status = JobStepStatusEnum.RUNNING;
- break;
- case FINAL_SAVING:
- case FINISHING:
- case FINISHED:
- case FAILED:
- case KILLING:
- case KILLED:
- }
- break;
- }
-
- } catch (Exception e) {
- output.append("Failed to get status from response with url + " + url + "\n");
- output.append("Exception: " + e.getLocalizedMessage() + "\n");
- log.error("Failed to get status from response with url + " + url + "!\n" + checkResponse, e);
- status = JobStepStatusEnum.ERROR;
- }
-
- return status;
- }
-
- private String getHttpResponse(String url) throws IOException {
- HttpClient client = new HttpClient();
-
- String response = null;
- while (response == null) { // follow redirects via 'refresh'
- if (url.startsWith("https://")) {
- registerEasyHttps();
- }
- if (url.contains("anonymous=true") == false) {
- url += url.contains("?") ? "&" : "?";
- url += "anonymous=true";
- }
-
- HttpMethod get = new GetMethod(url);
- client.executeMethod(get);
-
- String redirect = null;
- Header h = get.getResponseHeader("Refresh");
- if (h != null) {
- String s = h.getValue();
- int cut = s.indexOf("url=");
- if (cut >= 0) {
- redirect = s.substring(cut + 4);
- }
- }
-
- if (redirect == null) {
- response = get.getResponseBodyAsString();
- output.append("Job " + mrJobID + " get status check result.\n");
- log.debug("Job " + mrJobID + " get status check result.\n");
- } else {
- url = redirect;
- output.append("Job " + mrJobID + " check redirect url " + url + ".\n");
- log.debug("Job " + mrJobID + " check redirect url " + url + ".\n");
- }
-
- get.releaseConnection();
- }
-
- return response;
- }
-
- private static Protocol EASY_HTTPS = null;
-
- private static void registerEasyHttps() {
- // by pass all https issue
- if (EASY_HTTPS == null) {
- EASY_HTTPS = new Protocol("https", (ProtocolSocketFactory) new DefaultSslProtocolSocketFactory(), 443);
- Protocol.registerProtocol("https", EASY_HTTPS);
- }
- }
-
- public JobStepStatusEnum calculateStatus(JobStatus jobStatus) {
- JobStepStatusEnum status;
- switch (jobStatus.getState()) {
- case RUNNING:
- status = JobStepStatusEnum.RUNNING;
- break;
- case SUCCEEDED:
- status = JobStepStatusEnum.FINISHED;
- break;
- case FAILED:
- status = JobStepStatusEnum.ERROR;
- break;
- case PREP:
- status = JobStepStatusEnum.WAITING;
- break;
- case KILLED:
- status = JobStepStatusEnum.ERROR;
- break;
- default:
- status = JobStepStatusEnum.ERROR;
- }
-
- return status;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/HtableAlterMetadataCLI.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/HtableAlterMetadataCLI.java b/job/src/main/java/com/kylinolap/job/tools/HtableAlterMetadataCLI.java
deleted file mode 100644
index 66f9702..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/HtableAlterMetadataCLI.java
+++ /dev/null
@@ -1,72 +0,0 @@
-package com.kylinolap.job.tools;
-
-import com.kylinolap.job.hadoop.AbstractHadoopJob;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.util.ToolRunner;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Created by honma on 11/11/14.
- */
-@SuppressWarnings("static-access")
-public class HtableAlterMetadataCLI extends AbstractHadoopJob {
-
- private static final Option OPTION_METADATA_KEY = OptionBuilder.withArgName("key").hasArg().isRequired(true).withDescription("The metadata key").create("key");
- private static final Option OPTION_METADATA_VALUE = OptionBuilder.withArgName("value").hasArg().isRequired(true).withDescription("The metadata value").create("value");
-
- protected static final Logger log = LoggerFactory.getLogger(HtableAlterMetadataCLI.class);
-
- String tableName;
- String metadataKey;
- String metadataValue;
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
- try {
- options.addOption(OPTION_HTABLE_NAME);
- options.addOption(OPTION_METADATA_KEY);
- options.addOption(OPTION_METADATA_VALUE);
-
- parseOptions(options, args);
- tableName = getOptionValue(OPTION_HTABLE_NAME);
- metadataKey = getOptionValue(OPTION_METADATA_KEY);
- metadataValue = getOptionValue(OPTION_METADATA_VALUE);
-
- alter();
-
- return 0;
- } catch (Exception e) {
- e.printStackTrace(System.err);
- log.error(e.getLocalizedMessage(), e);
- return 2;
- }
- }
-
- private void alter() throws IOException {
- Configuration conf = HBaseConfiguration.create();
- HBaseAdmin hbaseAdmin = new HBaseAdmin(conf);
- HTableDescriptor table = hbaseAdmin.getTableDescriptor(TableName.valueOf(tableName));
-
- hbaseAdmin.disableTable(table.getTableName());
- table.setValue(metadataKey, metadataValue);
- hbaseAdmin.modifyTable(table.getTableName(), table);
- hbaseAdmin.enableTable(table.getTableName());
- hbaseAdmin.close();
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new HtableAlterMetadataCLI(), args);
- System.exit(exitCode);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/LZOSupportnessChecker.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/LZOSupportnessChecker.java b/job/src/main/java/com/kylinolap/job/tools/LZOSupportnessChecker.java
deleted file mode 100644
index 62dc594..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/LZOSupportnessChecker.java
+++ /dev/null
@@ -1,21 +0,0 @@
-package com.kylinolap.job.tools;
-
-import org.apache.hadoop.hbase.util.CompressionTest;
-
-import java.io.File;
-
-/**
- * Created by honma on 10/21/14.
- */
-public class LZOSupportnessChecker {
- public static boolean getSupportness()
- {
- try {
- File temp = File.createTempFile("test", ".tmp");
- CompressionTest.main(new String[] { "file://" + temp.toString(), "lzo" });
- } catch (Exception e) {
- return false;
- }
- return true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/main/java/com/kylinolap/job/tools/OptionsHelper.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/com/kylinolap/job/tools/OptionsHelper.java b/job/src/main/java/com/kylinolap/job/tools/OptionsHelper.java
deleted file mode 100644
index ea0b7e3..0000000
--- a/job/src/main/java/com/kylinolap/job/tools/OptionsHelper.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.tools;
-
-import java.io.File;
-
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class OptionsHelper {
- private CommandLine commandLine;
-
- public void parseOptions(Options options, String[] args) throws ParseException {
- CommandLineParser parser = new GnuParser();
- commandLine = parser.parse(options, args);
- }
-
- public Option[] getOptions() {
- return commandLine.getOptions();
- }
-
- public String getOptionsAsString() {
- StringBuilder buf = new StringBuilder();
- for (Option option : commandLine.getOptions()) {
- buf.append(" ");
- buf.append(option.getOpt());
- if (option.hasArg()) {
- buf.append("=");
- buf.append(option.getValue());
- }
- }
- return buf.toString();
- }
-
- public String getOptionValue(Option option) {
- return commandLine.getOptionValue(option.getOpt());
- }
-
- public boolean hasOption(Option option) {
- return commandLine.hasOption(option.getOpt());
- }
-
- public void printUsage(String programName, Options options) {
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp(programName, options);
- }
-
- public static String convertToFileURL(String path) {
- if (File.separatorChar != '/') {
- path = path.replace(File.separatorChar, '/');
- }
-
- return path;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java b/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java
deleted file mode 100644
index dc56ad8..0000000
--- a/job/src/test/java/com/kylinolap/job/BuildCubeWithEngineTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TimeZone;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.quartz.SchedulerException;
-
-import com.google.common.collect.Lists;
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.ClasspathUtil;
-import com.kylinolap.common.util.HBaseMetadataTestCase;
-import com.kylinolap.common.util.JsonUtil;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.exception.CubeIntegrityException;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.exception.InvalidJobInstanceException;
-
-/**
- * @author ysong1
- */
-public class BuildCubeWithEngineTest extends HBaseMetadataTestCase {
-
- protected JobManager jobManager;
- protected JobEngineConfig engineConfig;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- ClasspathUtil.addClasspath(new File(SANDBOX_TEST_DATA).getAbsolutePath());
- }
-
- @Before
- public void before() throws Exception {
- this.createTestMetadata();
-
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.overrideJobJarLocations();
- DeployUtil.overrideJobConf(SANDBOX_TEST_DATA);
-
- engineConfig = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
- jobManager = new JobManager("Build_Test_Cube_Engine", engineConfig);
- jobManager.deleteAllJobs();
- }
-
- @After
- public void after() throws IOException {
- // jobManager.deleteAllJobs();
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testCubes() throws Exception {
-
- // start job schedule engine
- jobManager.startJobEngine(10);
-
-// testSimpleLeftJoinCube();
-
- // keep this order.
- testLeftJoinCube();
- testInnerJoinCube();
-
- jobManager.stopJobEngine();
- }
-
- /**
- * For cube test_kylin_cube_with_slr_empty, we will create 2 segments For
- * cube test_kylin_cube_without_slr_empty, since it doesn't support
- * incremental build, we will create only 1 segment (full build)
- *
- * @throws Exception
- */
- private void testInnerJoinCube() throws Exception {
- DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty");
-
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- long dateStart;
- long dateEnd;
-
- ArrayList<String> jobs = new ArrayList<String>();
-
- // this cube's start date is 0, end date is 20501112000000
- dateStart = 0;
- dateEnd = f.parse("2013-01-01").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_with_slr_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- // this cube doesn't support incremental build, always do full build
- jobs.addAll(this.submitJob("test_kylin_cube_without_slr_empty", 0, 0, CubeBuildTypeEnum.BUILD));
-
- waitCubeBuilt(jobs);
-
- // then submit a incremental job, start date is 20130101000000, end date
- // is 20220101000000
- jobs.clear();
- dateStart = f.parse("2013-01-01").getTime();
- dateEnd = f.parse("2022-01-01").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_with_slr_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
- waitCubeBuilt(jobs);
- }
-
- /**
- * For cube test_kylin_cube_without_slr_left_join_empty, it is using
- * update_insert, we will create 2 segments, and then merge these 2 segments
- * into a larger segment For cube test_kylin_cube_with_slr_left_join_empty,
- * we will create only 1 segment
- *
- * @throws Exception
- */
- private void testLeftJoinCube() throws Exception {
- DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
-
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- long dateStart;
- long dateEnd;
-
- ArrayList<String> jobs = new ArrayList<String>();
-
- // this cube's start date is 0, end date is 20501112000000
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- dateStart = cubeMgr.getCube("test_kylin_cube_with_slr_left_join_empty").getDescriptor().getCubePartitionDesc().getPartitionDateStart();
- dateEnd = f.parse("2050-11-12").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_with_slr_left_join_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- // this cube's start date is 0, end date is 20120601000000
- dateStart = cubeMgr.getCube("test_kylin_cube_without_slr_left_join_empty").getDescriptor().getCubePartitionDesc().getPartitionDateStart();
- dateEnd = f.parse("2012-06-01").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_without_slr_left_join_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- waitCubeBuilt(jobs);
-
- jobs.clear();
- // then submit a update_insert job, start date is 20120101000000, end
- // date is 20220101000000
- dateStart = f.parse("2012-03-01").getTime();
- dateEnd = f.parse("2022-01-01").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_without_slr_left_join_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- waitCubeBuilt(jobs);
-
- jobs.clear();
-
- // final submit a merge job, start date is 0, end date is 20220101000000
- //dateEnd = f.parse("2022-01-01").getTime();
- //jobs.addAll(this.submitJob("test_kylin_cube_without_slr_left_join_empty", 0, dateEnd, CubeBuildTypeEnum.MERGE));
- //waitCubeBuilt(jobs);
- }
-
- @SuppressWarnings("unused")
- private void testSimpleLeftJoinCube() throws Exception {
- DeployUtil.prepareTestData("left", "test_kylin_cube_with_slr_left_join_empty");
-
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
- long dateStart;
- long dateEnd;
-
- ArrayList<String> jobs = new ArrayList<String>();
-
- // this cube's start date is 0, end date is 20501112000000
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- dateStart = cubeMgr.getCube("test_kylin_cube_with_slr_left_join_empty").getDescriptor().getCubePartitionDesc().getPartitionDateStart();
- dateEnd = f.parse("2050-11-12").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_with_slr_left_join_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- // this cube's start date is 0, end date is 20501112000000
- dateStart = cubeMgr.getCube("test_kylin_cube_without_slr_left_join_empty").getDescriptor().getCubePartitionDesc().getPartitionDateStart();
- dateEnd = f.parse("2050-11-12").getTime();
- jobs.addAll(this.submitJob("test_kylin_cube_without_slr_left_join_empty", dateStart, dateEnd, CubeBuildTypeEnum.BUILD));
-
- waitCubeBuilt(jobs);
- }
-
- protected void waitCubeBuilt(List<String> jobs) throws Exception {
-
- boolean allFinished = false;
- while (!allFinished) {
- // sleep for 1 minutes
- Thread.sleep(60 * 1000L);
-
- allFinished = true;
- for (String job : jobs) {
- JobInstance savedJob = jobManager.getJob(job);
- JobStatusEnum jobStatus = savedJob.getStatus();
- if (jobStatus.getCode() <= JobStatusEnum.RUNNING.getCode()) {
- allFinished = false;
- break;
- }
- }
- }
-
- for (String job : jobs)
- assertEquals("Job fail - " + job, JobStatusEnum.FINISHED, jobManager.getJob(job).getStatus());
- }
-
- protected List<String> submitJob(String cubename, long startDate, long endDate, CubeBuildTypeEnum jobType) throws SchedulerException, IOException, InvalidJobInstanceException, CubeIntegrityException {
-
- CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- CubeInstance cube = cubeMgr.getCube(cubename);
- CubeManager.getInstance(this.getTestConfig()).loadCubeCache(cube);
-
- System.out.println(JsonUtil.writeValueAsIndentString(cube));
- List<CubeSegment> newSegments = cubeMgr.allocateSegments(cube, jobType, startDate, endDate);
- System.out.println(JsonUtil.writeValueAsIndentString(cube));
-
- List<String> jobUuids = Lists.newArrayList();
- List<JobInstance> jobs = Lists.newArrayList();
- for (CubeSegment seg : newSegments) {
- String uuid = seg.getUuid();
- jobUuids.add(uuid);
- jobs.add(jobManager.createJob(cubename, seg.getName(), uuid, jobType,"KylinTest"));
- seg.setLastBuildJobID(uuid);
- }
- cubeMgr.updateCube(cube);
- for (JobInstance job: jobs) {
- // submit job to store
- jobManager.submitJob(job);
- }
- return jobUuids;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/DeployUtil.java b/job/src/test/java/com/kylinolap/job/DeployUtil.java
deleted file mode 100644
index 80774b5..0000000
--- a/job/src/test/java/com/kylinolap/job/DeployUtil.java
+++ /dev/null
@@ -1,216 +0,0 @@
-package com.kylinolap.job;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.maven.model.Model;
-import org.apache.maven.model.io.xpp3.MavenXpp3Reader;
-import org.codehaus.plexus.util.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.persistence.ResourceTool;
-import com.kylinolap.common.util.AbstractKylinTestCase;
-import com.kylinolap.common.util.CliCommandExecutor;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.dataGen.FactTableGenerator;
-import com.kylinolap.job.engine.JobEngineConfig;
-import com.kylinolap.job.hadoop.hive.SqlHiveDataTypeMapping;
-import com.kylinolap.job.tools.LZOSupportnessChecker;
-import com.kylinolap.metadata.MetadataManager;
-import com.kylinolap.metadata.model.schema.ColumnDesc;
-import com.kylinolap.metadata.model.schema.TableDesc;
-
-public class DeployUtil {
- @SuppressWarnings("unused")
- private static final Logger logger = LoggerFactory.getLogger(DeployUtil.class);
-
- public static void initCliWorkDir() throws IOException {
- execCliCommand("rm -rf " + getHadoopCliWorkingDir());
- execCliCommand("mkdir -p " + config().getKylinJobLogDir());
- }
-
- public static void deployMetadata() throws IOException {
- // install metadata to hbase
- ResourceTool.reset(config());
- ResourceTool.copy(KylinConfig.createInstanceFromUri(AbstractKylinTestCase.LOCALMETA_TEST_DATA), config());
-
- // update cube desc signature.
- for (CubeInstance cube : CubeManager.getInstance(config()).listAllCubes()) {
- cube.getDescriptor().setSignature(cube.getDescriptor().calculateSignature());
- CubeManager.getInstance(config()).updateCube(cube);
- }
- }
-
- public static void overrideJobJarLocations() {
- Pair<File, File> files = getJobJarFiles();
- File jobJar = files.getFirst();
- File coprocessorJar = files.getSecond();
-
- config().overrideKylinJobJarPath(jobJar.getAbsolutePath());
- config().overrideCoprocessorLocalJar(coprocessorJar.getAbsolutePath());
- }
-
- public static void deployJobJars() throws IOException {
- Pair<File, File> files = getJobJarFiles();
- File jobJar = files.getFirst();
- File coprocessorJar = files.getSecond();
-
- File jobJarRemote = new File(config().getKylinJobJarPath());
- File jobJarLocal = new File(jobJar.getParentFile(), jobJarRemote.getName());
- if (jobJar.equals(jobJarLocal) == false) {
- FileUtils.copyFile(jobJar, jobJarLocal);
- }
-
- File coprocessorJarRemote = new File(config().getCoprocessorLocalJar());
- File coprocessorJarLocal = new File(coprocessorJar.getParentFile(), coprocessorJarRemote.getName());
- if (coprocessorJar.equals(coprocessorJarLocal) == false) {
- FileUtils.copyFile(coprocessorJar, coprocessorJarLocal);
- }
-
- CliCommandExecutor cmdExec = config().getCliCommandExecutor();
- cmdExec.copyFile(jobJarLocal.getAbsolutePath(), jobJarRemote.getParent());
- cmdExec.copyFile(coprocessorJar.getAbsolutePath(), coprocessorJarRemote.getParent());
- }
-
- private static Pair<File, File> getJobJarFiles() {
- String version;
- try {
- MavenXpp3Reader pomReader = new MavenXpp3Reader();
- Model model = pomReader.read(new FileReader("../pom.xml"));
- version = model.getVersion();
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
-
- File jobJar = new File("../job/target", "kylin-job-" + version + "-job.jar");
- File coprocessorJar = new File("../storage/target", "kylin-storage-" + version + "-coprocessor.jar");
- return new Pair<File, File>(jobJar, coprocessorJar);
- }
-
- public static void overrideJobConf(String confDir) throws IOException {
- boolean enableLzo = LZOSupportnessChecker.getSupportness();
- overrideJobConf(confDir, enableLzo);
- }
-
- public static void overrideJobConf(String confDir, boolean enableLzo) throws IOException {
- File src = new File(confDir, JobEngineConfig.HADOOP_JOB_CONF_FILENAME + (enableLzo ? ".lzo_enabled" : ".lzo_disabled") + ".xml");
- File dst = new File(confDir, JobEngineConfig.HADOOP_JOB_CONF_FILENAME + ".xml");
- FileUtils.copyFile(src, dst);
- }
-
- private static void execCliCommand(String cmd) throws IOException {
- config().getCliCommandExecutor().execute(cmd);
- }
-
- private static String getHadoopCliWorkingDir() {
- return config().getCliWorkingDir();
- }
-
- private static KylinConfig config() {
- return KylinConfig.getInstanceFromEnv();
- }
-
- // ============================================================================
-
- static final String TABLE_CAL_DT = "test_cal_dt";
- static final String TABLE_CATEGORY_GROUPINGS = "test_category_groupings";
- static final String TABLE_KYLIN_FACT = "test_kylin_fact";
- static final String TABLE_SELLER_TYPE_DIM = "test_seller_type_dim";
- static final String TABLE_SITES = "test_sites";
-
- static final String[] TABLE_NAMES = new String[] { TABLE_CAL_DT, TABLE_CATEGORY_GROUPINGS, TABLE_KYLIN_FACT, TABLE_SELLER_TYPE_DIM, TABLE_SITES };
-
- public static void prepareTestData(String joinType, String cubeName) throws Exception {
- // data is generated according to cube descriptor and saved in resource store
- if (joinType.equalsIgnoreCase("inner")) {
- FactTableGenerator.generate(cubeName, "10000", "1", null, "inner");
- } else if (joinType.equalsIgnoreCase("left")) {
- FactTableGenerator.generate(cubeName, "10000", "0.6", null, "left");
- } else {
- throw new IllegalArgumentException("Unsupported join type : " + joinType);
- }
-
- deployHiveTables();
- }
-
- private static void deployHiveTables() throws Exception {
-
- MetadataManager metaMgr = MetadataManager.getInstance(config());
-
- // scp data files, use the data from hbase, instead of local files
- File temp = File.createTempFile("temp", ".csv");
- temp.createNewFile();
- for (String tablename : TABLE_NAMES) {
- tablename = tablename.toUpperCase();
-
- File localBufferFile = new File(temp.getParent() + "/" + tablename + ".csv");
- localBufferFile.createNewFile();
-
- InputStream hbaseDataStream = metaMgr.getStore().getResource("/data/" + tablename + ".csv");
- FileOutputStream localFileStream = new FileOutputStream(localBufferFile);
- IOUtils.copy(hbaseDataStream, localFileStream);
-
- hbaseDataStream.close();
- localFileStream.close();
-
- config().getCliCommandExecutor().copyFile(localBufferFile.getPath(), config().getCliWorkingDir());
- localBufferFile.delete();
- }
- temp.delete();
-
- // create hive tables
- execHiveCommand(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CAL_DT.toUpperCase())));
- execHiveCommand(generateCreateTableHql(metaMgr.getTableDesc(TABLE_CATEGORY_GROUPINGS.toUpperCase())));
- execHiveCommand(generateCreateTableHql(metaMgr.getTableDesc(TABLE_KYLIN_FACT.toUpperCase())));
- execHiveCommand(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SELLER_TYPE_DIM.toUpperCase())));
- execHiveCommand(generateCreateTableHql(metaMgr.getTableDesc(TABLE_SITES.toUpperCase())));
-
- // load data to hive tables
- // LOAD DATA LOCAL INPATH 'filepath' [OVERWRITE] INTO TABLE tablename
- execHiveCommand(generateLoadDataHql(TABLE_CAL_DT));
- execHiveCommand(generateLoadDataHql(TABLE_CATEGORY_GROUPINGS));
- execHiveCommand(generateLoadDataHql(TABLE_KYLIN_FACT));
- execHiveCommand(generateLoadDataHql(TABLE_SELLER_TYPE_DIM));
- execHiveCommand(generateLoadDataHql(TABLE_SITES));
- }
-
- private static void execHiveCommand(String hql) throws IOException {
- String hiveCmd = "hive -e \"" + hql + "\"";
- config().getCliCommandExecutor().execute(hiveCmd);
- }
-
- private static String generateLoadDataHql(String tableName) {
- return "LOAD DATA LOCAL INPATH '" + config().getCliWorkingDir() + "/" + tableName.toUpperCase() + ".csv' OVERWRITE INTO TABLE " + tableName.toUpperCase();
- }
-
- private static String generateCreateTableHql(TableDesc tableDesc) {
- StringBuilder ddl = new StringBuilder();
-
- ddl.append("DROP TABLE IF EXISTS " + tableDesc.getName() + ";\n");
- ddl.append("CREATE TABLE " + tableDesc.getName() + "\n");
- ddl.append("(" + "\n");
-
- for (int i = 0; i < tableDesc.getColumns().length; i++) {
- ColumnDesc col = tableDesc.getColumns()[i];
- if (i > 0) {
- ddl.append(",");
- }
- ddl.append(col.getName() + " " + SqlHiveDataTypeMapping.getHiveDataType((col.getDatatype())) + "\n");
- }
-
- ddl.append(")" + "\n");
- ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY ','" + "\n");
- ddl.append("STORED AS TEXTFILE;");
-
- return ddl.toString();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/JobDAOTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/JobDAOTest.java b/job/src/test/java/com/kylinolap/job/JobDAOTest.java
deleted file mode 100644
index f09c68e..0000000
--- a/job/src/test/java/com/kylinolap/job/JobDAOTest.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job;
-
-import static org.junit.Assert.*;
-
-import java.io.IOException;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.SerializationFeature;
-import com.kylinolap.common.util.LocalFileMetadataTestCase;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.job.exception.InvalidJobInstanceException;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class JobDAOTest extends LocalFileMetadataTestCase {
-
- @Before
- public void setup() throws Exception {
- createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- cleanupTestMetadata();
- }
-
- @Test
- public void test() throws IOException, InvalidJobInstanceException {
-
- String uuid = "132432cb-8c68-42d8-aa3a-504151b39d1b";
- JobDAO service = JobDAO.getInstance(getTestConfig());
- JobInstance job = createDumbJobInstance(uuid);
- assertEquals(0, job.getLastModified());
- service.updateJobInstance(job);
-
- // test read
- JobInstance job2 = service.getJob(uuid);
- // assertEquals(JobStatusEnum.PENDING, job2.getStatus());
- assertTrue(job2.getLastModified() > 0);
-
- // test modify
- job2.setRelatedCube("abc");
- service.updateJobInstance(job2);
- JobInstance job3 = service.getJob(uuid);
- assertEquals("abc", job3.getRelatedCube());
- assertTrue(job3.getLastModified() > 0);
-
- // test delete
- service.deleteJob(job2);
- JobInstance job4 = service.getJob(uuid);
- assertNull(job4);
- }
-
- @Test
- public void testOutput() throws IOException, InvalidJobInstanceException {
- String uuid = "132432cb-8c68-42d8-aa3a-504151b39d1b";
- int seq = 1;
- String s = "this is output";
- JobDAO service = JobDAO.getInstance(getTestConfig());
- service.saveJobOutput(uuid, seq, s);
-
- // test read
- JobStepOutput output2 = service.getJobOutput(uuid, seq);
- assertTrue(output2.getLastModified() > 0);
- assertEquals(s, output2.getOutput());
-
- }
-
- private JobInstance createDumbJobInstance(String uuid) {
- try {
- ObjectMapper mapper = new ObjectMapper();
- mapper.configure(SerializationFeature.INDENT_OUTPUT, true);
-
- JobInstance jobInstance = new JobInstance();
- jobInstance.setUuid(uuid);
- jobInstance.setType(CubeBuildTypeEnum.BUILD);
- jobInstance.setRelatedCube("test_kylin_cube_with_slr".toUpperCase());
- jobInstance.setName("Dummy_Job");
- // jobInstance.setStatus(JobStatusEnum.PENDING);
-
- return jobInstance;
- } catch (Exception e) {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/JobInstanceTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/JobInstanceTest.java b/job/src/test/java/com/kylinolap/job/JobInstanceTest.java
deleted file mode 100644
index a1c04ef..0000000
--- a/job/src/test/java/com/kylinolap/job/JobInstanceTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job;
-
-import static org.junit.Assert.*;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.LocalFileMetadataTestCase;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-import java.util.UUID;
-
-/**
- * @author ysong1
- *
- */
-public class JobInstanceTest extends LocalFileMetadataTestCase {
- @Before
- public void before() throws Exception {
- this.createTestMetadata();
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testJobInstanceStatus() throws Exception {
- KylinConfig kylinCfg = KylinConfig.getInstanceFromEnv();
- JobManager jobManager = new JobManager("JobInstanceTest", new JobEngineConfig(kylinCfg));
-
- JobInstance jobInstance = jobManager.createJob("test_kylin_cube_with_slr_1_new_segment", "20130331080000_20131212080000", UUID.randomUUID().toString(), CubeBuildTypeEnum.BUILD,"KylinTest");
- // initial job status should be PENDING
- assertEquals(JobStatusEnum.PENDING, jobInstance.getStatus());
-
- // if a step fails, job status should be ERROR
- jobInstance.getSteps().get(3).setStatus(JobStepStatusEnum.ERROR);
- assertEquals(JobStatusEnum.ERROR, jobInstance.getStatus());
-
- // then resume job, job status should be NEW
- jobInstance.getSteps().get(0).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(1).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(2).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(3).setStatus(JobStepStatusEnum.PENDING);
- assertEquals(JobStatusEnum.PENDING, jobInstance.getStatus());
-
- // running job
- jobInstance.getSteps().get(0).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(1).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(2).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(3).setStatus(JobStepStatusEnum.RUNNING);
- assertEquals(JobStatusEnum.RUNNING, jobInstance.getStatus());
-
- // kill job
- jobInstance.getSteps().get(0).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(1).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(2).setStatus(JobStepStatusEnum.FINISHED);
- jobInstance.getSteps().get(3).setStatus(JobStepStatusEnum.DISCARDED);
- assertEquals(JobStatusEnum.DISCARDED, jobInstance.getStatus());
-
- // finish job
- for (JobStep step : jobInstance.getSteps()) {
- step.setStatus(JobStepStatusEnum.FINISHED);
- }
- assertEquals(JobStatusEnum.FINISHED, jobInstance.getStatus());
-
- // finish job
- for (JobStep step : jobInstance.getSteps()) {
- step.setStatus(JobStepStatusEnum.NEW);
- }
- assertEquals(JobStatusEnum.NEW, jobInstance.getStatus());
-
- // default
- jobInstance.getSteps().get(3).setStatus(JobStepStatusEnum.WAITING);
- assertEquals(JobStatusEnum.RUNNING, jobInstance.getStatus());
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/SampleCubeSetupTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/SampleCubeSetupTest.java b/job/src/test/java/com/kylinolap/job/SampleCubeSetupTest.java
deleted file mode 100644
index 30ed697..0000000
--- a/job/src/test/java/com/kylinolap/job/SampleCubeSetupTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package com.kylinolap.job;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.commons.lang3.StringUtils;
-import org.codehaus.plexus.util.FileUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.ClasspathUtil;
-import com.kylinolap.common.util.HBaseMetadataTestCase;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.job.engine.JobEngineConfig;
-
-/**
- * Created by honma on 9/24/14.
- * <p/>
- * This class is only used for building a sample cube in the one-line deployment tool.
- */
-public class SampleCubeSetupTest extends HBaseMetadataTestCase {
-
- @Before
- public void before() throws Exception {
-
- try {
- this.testConnectivity();
- } catch (Exception e) {
- System.out.println("Failed to connect to remote CLI with given password");
- throw e;
- }
-
- String confPaths = System.getenv("KYLIN_HBASE_CONF_PATH");
- System.out.println("The conf paths is " + confPaths);
- if (confPaths != null) {
- String[] paths = confPaths.split(":");
- for (String path : paths) {
- if (!StringUtils.isEmpty(path)) {
- try {
- ClasspathUtil.addClasspath(new File(path).getAbsolutePath());
- } catch (Exception e) {
- System.out.println(e.getLocalizedMessage());
- System.out.println(e.getStackTrace());
- }
- }
- }
- }
- }
-
- private void testConnectivity() throws Exception {
- KylinConfig cfg = KylinConfig.getInstanceFromEnv();
- cfg.getCliCommandExecutor().execute("echo hello");
- }
-
- @Test
- public void testCubes() throws Exception {
- DeployUtil.initCliWorkDir();
- DeployUtil.deployMetadata();
- DeployUtil.deployJobJars();
- deployJobConfToEtc();
- DeployUtil.prepareTestData("inner", "test_kylin_cube_with_slr_empty");
-
- // remove all other cubes to keep it clean
- CubeManager cubeManager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
- for (CubeInstance cubeInstance : cubeManager.listAllCubes()) {
- if (!cubeInstance.getName().equalsIgnoreCase("test_kylin_cube_without_slr_empty") && !cubeInstance.getName().equalsIgnoreCase("test_kylin_cube_with_slr_empty"))
- cubeManager.dropCube(cubeInstance.getName(), false);
- }
-
- }
-
- private void deployJobConfToEtc() throws IOException {
- String lzoSupportness = System.getenv("KYLIN_LZO_SUPPORTED");
- boolean enableLzo = "true".equalsIgnoreCase(lzoSupportness);
- DeployUtil.overrideJobConf(SANDBOX_TEST_DATA, enableLzo);
-
- File src = new File(SANDBOX_TEST_DATA, JobEngineConfig.HADOOP_JOB_CONF_FILENAME + ".xml");
- File dst = new File("/etc/kylin", src.getName());
- FileUtils.copyFile(src, dst);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/engine/GenericJobEngineTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/engine/GenericJobEngineTest.java b/job/src/test/java/com/kylinolap/job/engine/GenericJobEngineTest.java
deleted file mode 100644
index 47974fc..0000000
--- a/job/src/test/java/com/kylinolap/job/engine/GenericJobEngineTest.java
+++ /dev/null
@@ -1,441 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package com.kylinolap.job.engine;
-
-import static org.junit.Assert.*;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.AbstractKylinTestCase;
-import com.kylinolap.common.util.JsonUtil;
-import com.kylinolap.common.util.SSHClient;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.CubeSegmentStatusEnum;
-import com.kylinolap.cube.exception.CubeIntegrityException;
-import com.kylinolap.cube.project.ProjectManager;
-import com.kylinolap.job.JobDAO;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.JobManager;
-import com.kylinolap.job.constant.JobConstants;
-import com.kylinolap.job.constant.JobStatusEnum;
-import com.kylinolap.job.constant.JobStepCmdTypeEnum;
-import com.kylinolap.job.constant.JobStepStatusEnum;
-import com.kylinolap.job.exception.InvalidJobInstanceException;
-import com.kylinolap.metadata.MetadataManager;
-
-/**
- * @author ysong1
- */
-public class GenericJobEngineTest {
- private static String cubeName = "test_kylin_cube_with_slr_empty";
-
- private static String tempTestMetadataUrl = "../examples/sample_metadata";
- private static JobManager jobManager;
-
- private static JobDAO jobDAO;
-
- private static String mrInputDir = "/tmp/mapredsmokeinput";
- private static String mrOutputDir1 = "/tmp/mapredsmokeoutput1";
- private static String mrOutputDir2 = "/tmp/mapredsmokeoutput2";
- private static String mrCmd = "hadoop --config /etc/hadoop/conf jar /usr/lib/hadoop-mapreduce/hadoop-mapreduce-examples-2.*.jar wordcount " + mrInputDir + " ";
-
- public static void removeHdfsDir(SSHClient hadoopCli, String hdfsDir) throws Exception {
- String cmd = "hadoop fs -rm -f -r " + hdfsDir;
- hadoopCli.execCommand(cmd);
- }
-
- public static String getHadoopCliHostname() {
- return getHadoopCliAsRemote() ? KylinConfig.getInstanceFromEnv().getRemoteHadoopCliHostname() : null;
- }
-
- public static String getHadoopCliUsername() {
- return getHadoopCliAsRemote() ? KylinConfig.getInstanceFromEnv().getRemoteHadoopCliUsername() : null;
- }
-
- public static String getHadoopCliPassword() {
- return getHadoopCliAsRemote() ? KylinConfig.getInstanceFromEnv().getRemoteHadoopCliPassword() : null;
- }
-
- public static boolean getHadoopCliAsRemote() {
- return KylinConfig.getInstanceFromEnv().getRunAsRemoteCommand();
- }
-
- public static void scpFilesToHdfs(SSHClient hadoopCli, String[] localFiles, String hdfsDir) throws Exception {
- String remoteTempDir = "/tmp/";
-
- List<String> nameList = new ArrayList<String>();
- for (int i = 0; i < localFiles.length; i++) {
- File file = new File(localFiles[i]);
- String filename = file.getName();
- nameList.add(filename);
- }
- for (String f : localFiles) {
- hadoopCli.scpFileToRemote(f, remoteTempDir);
- }
- for (String f : nameList) {
- hadoopCli.execCommand("hadoop fs -mkdir -p " + hdfsDir);
- hadoopCli.execCommand("hadoop fs -put -f " + remoteTempDir + f + " " + hdfsDir + "/" + f);
- hadoopCli.execCommand("hadoop fs -chmod 777 " + hdfsDir + "/" + f);
- }
- }
-
- @BeforeClass
- public static void beforeClass() throws Exception {
-
- FileUtils.forceMkdir(new File(KylinConfig.getInstanceFromEnv().getKylinJobLogDir()));
-
- FileUtils.deleteDirectory(new File(tempTestMetadataUrl));
- FileUtils.copyDirectory(new File(AbstractKylinTestCase.LOCALMETA_TEST_DATA), new File(tempTestMetadataUrl));
- System.setProperty(KylinConfig.KYLIN_CONF, tempTestMetadataUrl);
-
- // deploy files to hdfs
- SSHClient hadoopCli = new SSHClient(getHadoopCliHostname(), getHadoopCliUsername(), getHadoopCliPassword(), null);
- scpFilesToHdfs(hadoopCli, new String[] { "src/test/resources/json/dummy_jobinstance.json" }, mrInputDir);
- // deploy sample java jar
- hadoopCli.scpFileToRemote("src/test/resources/jarfile/SampleJavaProgram.jarfile", "/tmp");
- hadoopCli.scpFileToRemote("src/test/resources/jarfile/SampleBadJavaProgram.jarfile", "/tmp");
-
- // create log dir
- hadoopCli.execCommand("mkdir -p " + KylinConfig.getInstanceFromEnv().getKylinJobLogDir());
- KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
- kylinConfig.setMetadataUrl(tempTestMetadataUrl);
-
- jobManager = new JobManager("GenericJobEngineTest", new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
-
- jobDAO = JobDAO.getInstance(KylinConfig.getInstanceFromEnv());
-
- jobDAO.updateJobInstance(createARunningJobInstance("a_running_job"));
-
- jobManager.startJobEngine(2);
- Thread.sleep(2000);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- FileUtils.deleteDirectory(new File(tempTestMetadataUrl));
- System.clearProperty(KylinConfig.KYLIN_CONF);
-
- // print metrics
- System.out.println("Job step duration seconds 80 percentile: " + jobManager.getPercentileJobStepDuration(80));
- System.out.println("Max job step duration seconds: " + jobManager.getMaxJobStepDuration());
- System.out.println("Min job step duration seconds: " + jobManager.getMinJobStepDuration());
- System.out.println("# job steps executed: " + jobManager.getNumberOfJobStepsExecuted());
- System.out.println("Engine ID: " + jobManager.getPrimaryEngineID());
-
- jobManager.stopJobEngine();
-
- }
-
- @Before
- public void before() throws Exception {
- SSHClient hadoopCli = new SSHClient(getHadoopCliHostname(), getHadoopCliUsername(), getHadoopCliPassword(), null);
- removeHdfsDir(hadoopCli, mrOutputDir1);
- removeHdfsDir(hadoopCli, mrOutputDir2);
-
- MetadataManager.removeInstance(KylinConfig.getInstanceFromEnv());
- CubeManager.removeInstance(KylinConfig.getInstanceFromEnv());
- ProjectManager.removeInstance(KylinConfig.getInstanceFromEnv());
- }
-
- @Test(expected = InvalidJobInstanceException.class)
- public void testSubmitDuplicatedJobs() throws IOException, InvalidJobInstanceException, CubeIntegrityException {
- String uuid = "bad_job_2";
- JobInstance job = createASingleStepBadJobInstance(uuid);
- // job.setStatus(JobStatusEnum.KILLED);
- jobManager.submitJob(job);
- jobManager.submitJob(job);
- }
-
- @Test
- public void testGoodJob() throws Exception {
- String uuid = "good_job_uuid";
- jobManager.submitJob(createAGoodJobInstance(uuid, 5));
-
- waitUntilJobComplete(uuid);
-
- JobInstance savedJob1 = jobManager.getJob(uuid);
- assertEquals(JobStatusEnum.FINISHED, savedJob1.getStatus());
- String jobString = JsonUtil.writeValueAsIndentString(savedJob1);
- System.out.println(jobString);
- assertTrue(jobString.length() > 0);
-
- // cube should be updated
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- String cubeString = JsonUtil.writeValueAsIndentString(cube);
- System.out.println(cubeString);
- assertEquals("good_job_uuid", cube.getSegments().get(0).getLastBuildJobID());
- assertTrue(cube.getSegments().get(0).getSizeKB() > 0);
- }
-
- @Test
- public void testSingleStepBadJob() throws Exception {
- String uuid = "single_step_bad_job_uuid";
- jobManager.submitJob(createASingleStepBadJobInstance(uuid));
-
- waitUntilJobComplete(uuid);
-
- JobInstance savedJob1 = jobManager.getJob(uuid);
- assertEquals(JobStatusEnum.ERROR, savedJob1.getStatus());
- String jobString = JsonUtil.writeValueAsIndentString(savedJob1);
- System.out.println(jobString);
- assertTrue(jobString.length() > 0);
-
- // cube should be updated
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- String cubeString = JsonUtil.writeValueAsIndentString(cube);
- System.out.println(cubeString);
- assertEquals(CubeSegmentStatusEnum.NEW, cube.getSegments().get(0).getStatus());
- }
-
- @Test
- public void testSelfCheckAndResume() throws Exception {
- String uuid = "a_running_job";
- assertEquals(JobStatusEnum.ERROR, jobManager.getJob(uuid).getStatus());
- // check step status
- assertEquals(JobStepStatusEnum.FINISHED, jobManager.getJob(uuid).getSteps().get(0).getStatus());
- assertEquals(JobStepStatusEnum.ERROR, jobManager.getJob(uuid).getSteps().get(1).getStatus());
- assertEquals(JobStepStatusEnum.PENDING, jobManager.getJob(uuid).getSteps().get(2).getStatus());
-
- jobManager.resumeJob(uuid);
-
- waitUntilJobComplete(uuid);
- assertEquals(JobStatusEnum.FINISHED, jobManager.getJob(uuid).getStatus());
- }
-
- @Test
- public void testDiscardSyncStep() throws Exception {
- String uuid = "a_long_running_good_job_uuid";
- JobInstance job = createAGoodJobInstance(uuid, 600);
- jobManager.submitJob(job);
-
- try {
- // sleep for 5 seconds
- Thread.sleep(5L * 1000L);
- } catch (Exception e) {
- }
-
- try {
- jobManager.discardJob(uuid);
- } catch (RuntimeException e) {
- throw e;
- }
-
- waitUntilJobComplete(uuid);
-
- JobInstance killedJob = jobManager.getJob(uuid);
- assertEquals(JobStepStatusEnum.DISCARDED, killedJob.getSteps().get(0).getStatus());
- assertEquals(JobStatusEnum.DISCARDED, killedJob.getStatus());
- }
-
- @Test
- public void testKillMrStep() throws Exception {
- String uuid = "a_long_running_good_job_uuid_2";
- JobInstance job = createAGoodJobInstance(uuid, 1);
- jobManager.submitJob(job);
-
- try {
- waitUntilMrStepIsRunning(uuid);
- jobManager.discardJob(uuid);
- } catch (RuntimeException e) {
- throw e;
- }
-
- waitUntilJobComplete(uuid);
-
- JobInstance killedJob = jobManager.getJob(uuid);
- assertEquals(JobStepStatusEnum.ERROR, killedJob.getSteps().get(1).getStatus());
- assertEquals(JobStatusEnum.ERROR, killedJob.getStatus());
-
- // cube should be updated
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- String cubeString = JsonUtil.writeValueAsIndentString(cube);
- System.out.println(cubeString);
- assertEquals(0, cube.getSegments().size());
- }
-
- private void waitUntilMrStepIsRunning(String jobUuid) throws InterruptedException, IOException {
- boolean running = false;
- while (running == false) {
- // sleep for 1 seconds
- Thread.sleep(1 * 1000L);
-
- JobInstance savedJob = jobManager.getJob(jobUuid);
- for (JobStep step : savedJob.getSteps()) {
- if (step.getCmdType().equals(JobStepCmdTypeEnum.SHELL_CMD_HADOOP) && step.getStatus().equals(JobStepStatusEnum.RUNNING) && step.getInfo(JobInstance.MR_JOB_ID) != null) {
- System.out.println("MR step is running with id " + step.getInfo(JobInstance.MR_JOB_ID));
- running = true;
- break;
- }
- }
-
- }
-
- }
-
- private void waitUntilJobComplete(String jobUuid) throws IOException, InterruptedException {
- boolean finished = false;
- while (!finished) {
- // sleep for 5 seconds
- Thread.sleep(5 * 1000L);
-
- finished = true;
-
- JobInstance savedJob = jobManager.getJob(jobUuid);
- JobStatusEnum jobStatus = savedJob.getStatus();
- System.out.println("Job " + jobUuid + " status: " + jobStatus);
- if (jobStatus.getCode() <= JobStatusEnum.RUNNING.getCode()) {
- finished = false;
- }
- }
- }
-
- private JobInstance createAGoodJobInstance(String uuid, int syncCmdSleepSeconds) throws IOException, CubeIntegrityException {
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- cube.getSegments().clear();
- CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cube);
- CubeSegment seg = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).allocateSegments(cube, CubeBuildTypeEnum.BUILD, 0, 12345L).get(0);
-
- JobInstance jobInstance = new JobInstance();
- jobInstance.setUuid(uuid);
- jobInstance.setRelatedCube(cubeName);
- jobInstance.setRelatedSegment(seg.getName());
- jobInstance.setName("A_Good_Job");
- // jobInstance.setStatus(JobStatusEnum.PENDING);
- jobInstance.setType(CubeBuildTypeEnum.BUILD);
- // jobInstance.putInputParameter(JobConstants.PROP_STORAGE_LOCATION,
- // "htablename");
-
- JobStep step1 = new JobStep();
- step1.setName("step1");
- step1.setExecCmd("java -jar /tmp/SampleJavaProgram.jarfile " + syncCmdSleepSeconds);
- step1.setStatus(JobStepStatusEnum.PENDING);
- step1.setSequenceID(0);
- step1.setCmdType(JobStepCmdTypeEnum.SHELL_CMD);
- step1.setRunAsync(false);
-
- // async mr step
- JobStep step2 = new JobStep();
- step2.setName(JobConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- step2.setExecCmd(mrCmd + mrOutputDir1);
- step2.setStatus(JobStepStatusEnum.PENDING);
- step2.setSequenceID(1);
- step2.setCmdType(JobStepCmdTypeEnum.SHELL_CMD_HADOOP);
- step2.setRunAsync(true);
-
- // async mr step
- JobStep step3 = new JobStep();
- step3.setName("synced mr step");
- step3.setExecCmd(mrCmd + mrOutputDir2);
- step3.setStatus(JobStepStatusEnum.PENDING);
- step3.setSequenceID(2);
- step3.setCmdType(JobStepCmdTypeEnum.SHELL_CMD_HADOOP);
- step3.setRunAsync(false);
-
- jobInstance.addStep(0, step1);
- jobInstance.addStep(1, step2);
- jobInstance.addStep(2, step3);
- return jobInstance;
- }
-
- private JobInstance createASingleStepBadJobInstance(String uuid) throws IOException, CubeIntegrityException {
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- cube.getSegments().clear();
- CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cube);
- CubeSegment seg = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).allocateSegments(cube, CubeBuildTypeEnum.BUILD, 0, 12345L).get(0);
-
- JobInstance jobInstance = new JobInstance();
- jobInstance.setUuid(uuid);
- jobInstance.setRelatedCube(cubeName);
- jobInstance.setRelatedSegment(seg.getName());
- jobInstance.setName("A_Bad_Job");
- // jobInstance.setStatus(JobStatusEnum.PENDING);
- jobInstance.setType(CubeBuildTypeEnum.BUILD);
- // jobInstance.putInputParameter(JobConstants.PROP_STORAGE_LOCATION,
- // "htablename");
-
- JobStep step1 = new JobStep();
- step1.setName("step1");
- step1.setExecCmd("java -jar /tmp/SampleBadJavaProgram.jarfile");
- step1.setStatus(JobStepStatusEnum.PENDING);
- step1.setSequenceID(0);
- step1.setRunAsync(false);
- step1.setCmdType(JobStepCmdTypeEnum.SHELL_CMD);
- jobInstance.addStep(0, step1);
-
- return jobInstance;
- }
-
- private static JobInstance createARunningJobInstance(String uuid) throws IOException, CubeIntegrityException {
- CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
- cube.getSegments().clear();
- CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).updateCube(cube);
- CubeSegment seg = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).allocateSegments(cube, CubeBuildTypeEnum.BUILD, 0, 12345L).get(0);
-
- JobInstance jobInstance = new JobInstance();
- jobInstance.setUuid(uuid);
- jobInstance.setRelatedCube(cubeName);
- jobInstance.setRelatedSegment(seg.getName());
- jobInstance.setName("A_Running_Job");
- // jobInstance.setStatus(JobStatusEnum.RUNNING);
- jobInstance.setType(CubeBuildTypeEnum.BUILD);
- // jobInstance.putInputParameter(JobConstants.PROP_STORAGE_LOCATION,
- // "htablename");
-
- JobStep step1 = new JobStep();
- step1.setName("step1");
- step1.setExecCmd("echo step1");
- step1.setStatus(JobStepStatusEnum.FINISHED);
- step1.setSequenceID(0);
- step1.setRunAsync(false);
- step1.setCmdType(JobStepCmdTypeEnum.SHELL_CMD);
-
- JobStep step2 = new JobStep();
- step2.setName("step2");
- step2.setExecCmd(mrCmd + mrOutputDir1);
- step2.setStatus(JobStepStatusEnum.RUNNING);
- step2.setSequenceID(1);
- step2.setRunAsync(true);
- step2.setCmdType(JobStepCmdTypeEnum.SHELL_CMD_HADOOP);
-
- JobStep step3 = new JobStep();
- step3.setName("step3");
- step3.setExecCmd("java -jar /tmp/SampleJavaProgram.jarfile 3");
- step3.setStatus(JobStepStatusEnum.PENDING);
- step3.setSequenceID(2);
- step3.setRunAsync(false);
- step3.setCmdType(JobStepCmdTypeEnum.SHELL_CMD);
-
- jobInstance.addStep(0, step1);
- jobInstance.addStep(1, step2);
- jobInstance.addStep(2, step3);
- return jobInstance;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/a4fd4268/job/src/test/java/com/kylinolap/job/engine/JobInstanceBuilderTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/com/kylinolap/job/engine/JobInstanceBuilderTest.java b/job/src/test/java/com/kylinolap/job/engine/JobInstanceBuilderTest.java
deleted file mode 100644
index 3828cd2..0000000
--- a/job/src/test/java/com/kylinolap/job/engine/JobInstanceBuilderTest.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Copyright 2013-2014 eBay Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.kylinolap.job.engine;
-
-import static org.junit.Assert.*;
-
-import java.text.SimpleDateFormat;
-import java.util.TimeZone;
-import java.util.UUID;
-
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import com.kylinolap.common.KylinConfig;
-import com.kylinolap.common.util.JsonUtil;
-import com.kylinolap.common.util.LocalFileMetadataTestCase;
-import com.kylinolap.cube.CubeBuildTypeEnum;
-import com.kylinolap.cube.CubeInstance;
-import com.kylinolap.cube.CubeManager;
-import com.kylinolap.cube.CubeSegment;
-import com.kylinolap.cube.project.ProjectManager;
-import com.kylinolap.dict.DictionaryManager;
-import com.kylinolap.job.JobInstance;
-import com.kylinolap.job.JobInstance.JobStep;
-import com.kylinolap.job.JobManager;
-import com.kylinolap.job.constant.JobStepCmdTypeEnum;
-import com.kylinolap.metadata.MetadataManager;
-
-/**
- * @author George Song (ysong1)
- *
- */
-public class JobInstanceBuilderTest extends LocalFileMetadataTestCase {
-
- @Before
- public void before() throws Exception {
- this.createTestMetadata();
- MetadataManager.removeInstance(this.getTestConfig());
- CubeManager.removeInstance(this.getTestConfig());
- ProjectManager.removeInstance(this.getTestConfig());
- DictionaryManager.removeInstance(this.getTestConfig());
- }
-
- @After
- public void after() throws Exception {
- this.cleanupTestMetadata();
- }
-
- @Test
- public void testCreateSteps() throws Exception {
- // create a new cube
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
- f.setTimeZone(TimeZone.getTimeZone("GMT"));
-
- long dateEnd = f.parse("2013-11-12").getTime();
-
- JobManager jobManager = new JobManager("JobInstanceBuilderTest", new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
- String cubeName = "test_kylin_cube_with_slr_empty";
- CubeManager cubeManager = CubeManager.getInstance(this.getTestConfig());
- CubeInstance cube = cubeManager.getCube(cubeName);
-
- // initial segment
- CubeSegment segment = cubeManager.allocateSegments(cube, CubeBuildTypeEnum.BUILD, 0, dateEnd).get(0);
-
- JobInstance jobInstance = jobManager.createJob(cubeName, segment.getName(), UUID.randomUUID().toString(), CubeBuildTypeEnum.BUILD,"KylinTest");
-
- String actual = JsonUtil.writeValueAsIndentString(jobInstance);
- System.out.println(actual);
-
- assertEquals(13, jobInstance.getSteps().size());
-
- assertTrue(jobInstance.getSteps().get(3).getExecCmd().contains(JobEngineConfig.HADOOP_JOB_CONF_FILENAME + ".xml"));
-
- JobStep jobStep;
- // check each step
- jobStep = jobInstance.getSteps().get(0);
- assertEquals(JobStepCmdTypeEnum.SHELL_CMD_HADOOP, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(1);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_FACTDISTINCT, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(2);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NO_MR_DICTIONARY, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(3);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_BASECUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(4);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NDCUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(5);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NDCUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(6);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NDCUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(7);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NDCUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(8);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NDCUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(9);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(10);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(11);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_CONVERTHFILE, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(12);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NO_MR_BULKLOAD, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
- }
-
- @Test
- public void testCreateMergeSteps() throws Exception {
-
- JobManager jobManager = new JobManager("JobInstanceBuilderTest", new JobEngineConfig(KylinConfig.getInstanceFromEnv()));
- String cubeName = "test_kylin_cube_with_slr_ready_2_segments";
- CubeManager cubeManager = CubeManager.getInstance(this.getTestConfig());
- CubeInstance cube = cubeManager.getCube(cubeName);
-
- // initial segment
- CubeSegment segment = CubeManager.getInstance(this.getTestConfig()).allocateSegments(cube, CubeBuildTypeEnum.MERGE, 1384240200000L, 1386835200000L).get(0);
-
- JobInstance jobInstance = jobManager.createJob(cubeName, segment.getName(), UUID.randomUUID().toString(), CubeBuildTypeEnum.MERGE,"KylinTest");
-
- String actual = JsonUtil.writeValueAsIndentString(jobInstance);
- System.out.println(actual);
-
- assertEquals(5, jobInstance.getSteps().size());
-
- JobStep jobStep;
- // check each step
- jobStep = jobInstance.getSteps().get(0);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_MERGECUBOID, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(1);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_RANGEKEYDISTRIBUTION, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(2);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADDOP_NO_MR_CREATEHTABLE, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(3);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_CONVERTHFILE, jobStep.getCmdType());
- assertEquals(true, jobStep.isRunAsync());
-
- jobStep = jobInstance.getSteps().get(4);
- assertEquals(JobStepCmdTypeEnum.JAVA_CMD_HADOOP_NO_MR_BULKLOAD, jobStep.getCmdType());
- assertEquals(false, jobStep.isRunAsync());
- }
-}