You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/12/25 10:33:51 UTC
[3/4] kylin git commit: KYLIN-1245 Put layer cubing and in-mem cubing
side by side, switch based on 'mapper overlap ratio'
KYLIN-1245 Put layer cubing and in-mem cubing side by side, switch based on 'mapper overlap ratio'
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2aedddf4
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2aedddf4
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2aedddf4
Branch: refs/heads/2.0-rc
Commit: 2aedddf4894cd09bf039bf9479a856cd38f6cf1d
Parents: 8936df1
Author: Yang Li <li...@apache.org>
Authored: Mon Dec 21 19:52:31 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Fri Dec 25 17:20:34 2015 +0800
----------------------------------------------------------------------
build/conf/kylin_hive_conf.xml | 2 +-
.../org/apache/kylin/common/KylinConfig.java | 578 +------------------
.../apache/kylin/common/KylinConfigBase.java | 506 ++++++++++++++++
.../org/apache/kylin/common/KylinConfigExt.java | 23 +
.../apache/kylin/common/util/MailService.java | 2 +-
.../kylin/common/util/MailServiceTest.java | 4 +-
.../org/apache/kylin/cube/CubeInstance.java | 10 -
.../java/org/apache/kylin/cube/CubeManager.java | 6 +-
.../org/apache/kylin/cube/model/CubeDesc.java | 3 +
.../org/apache/kylin/cube/CubeSegmentsTest.java | 2 +-
.../kylin/job/execution/AbstractExecutable.java | 4 +
.../kylin/engine/mr/BatchCubingJobBuilder2.java | 94 ++-
.../kylin/engine/mr/BatchMergeJobBuilder2.java | 35 +-
.../org/apache/kylin/engine/mr/CubingJob.java | 23 +-
.../org/apache/kylin/engine/mr/IMROutput2.java | 104 ++--
.../java/org/apache/kylin/engine/mr/MRUtil.java | 6 -
.../engine/mr/common/AbstractHadoopJob.java | 7 +-
.../kylin/engine/mr/common/CubeStatsReader.java | 246 ++++++++
.../kylin/engine/mr/common/CuboidStatsUtil.java | 15 +-
.../engine/mr/common/MapReduceExecutable.java | 6 +-
.../kylin/engine/mr/steps/BaseCuboidJob.java | 3 +-
.../apache/kylin/engine/mr/steps/CuboidJob.java | 26 +
.../mr/steps/FactDistinctColumnsReducer.java | 22 +-
.../kylin/engine/mr/steps/InMemCuboidJob.java | 78 ++-
.../engine/mr/steps/InMemCuboidReducer.java | 28 +-
.../mr/steps/MergeCuboidFromStorageJob.java | 94 ---
.../mr/steps/MergeCuboidFromStorageMapper.java | 261 ---------
.../engine/mr/steps/MergeStatisticsStep.java | 2 +-
.../engine/mr/steps/SaveStatisticsStep.java | 37 ++
.../apache/kylin/engine/spark/SparkCubing.java | 67 ++-
.../streaming/monitor/StreamingMonitor.java | 2 +-
.../kylin/rest/security/CrossDomainFilter.java | 2 +-
.../apache/kylin/rest/service/CubeService.java | 2 +-
.../apache/kylin/rest/service/JobService.java | 2 +-
.../storage/hbase/steps/CreateHTableJob.java | 143 +----
.../storage/hbase/steps/HBaseMROutput2.java | 290 ----------
.../hbase/steps/HBaseMROutput2Transition.java | 329 +----------
37 files changed, 1241 insertions(+), 1823 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/build/conf/kylin_hive_conf.xml
----------------------------------------------------------------------
diff --git a/build/conf/kylin_hive_conf.xml b/build/conf/kylin_hive_conf.xml
index 3fe17b6..9bf74d4 100644
--- a/build/conf/kylin_hive_conf.xml
+++ b/build/conf/kylin_hive_conf.xml
@@ -22,7 +22,7 @@ limitations under the License. See accompanying LICENSE file.
<property>
<name>dfs.block.size</name>
- <value>10485760</value>
+ <value>32000000</value>
<description>Want more mappers for in-mem cubing, thus smaller the DFS block size</description>
</property>
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index c65ade4..bc18989 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -27,129 +27,29 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
-import java.io.Serializable;
import java.io.StringWriter;
import java.util.Enumeration;
import java.util.Map;
import java.util.Properties;
-import java.util.SortedSet;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.Log4jConfigurer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.Sets;
-
/**
*/
@SuppressWarnings("serial")
-public class KylinConfig implements Serializable {
-
- /*
- * NOTE: These key constants should be private or even better be removed.
- * All external access should go through public methods.
- */
- public static final String KYLIN_OWNER = "kylin.owner";
-
- public static final String KYLIN_STORAGE_URL = "kylin.storage.url";
-
- public static final String KYLIN_METADATA_URL = "kylin.metadata.url";
-
- public static final String KYLIN_REST_SERVERS = "kylin.rest.servers";
-
- public static final String KYLIN_REST_TIMEZONE = "kylin.rest.timezone";
-
- public static final String KYLIN_JOB_CONCURRENT_MAX_LIMIT = "kylin.job.concurrent.max.limit";
-
- public static final String KYLIN_JOB_YARN_APP_REST_CHECK_URL = "kylin.job.yarn.app.rest.check.status.url";
+public class KylinConfig extends KylinConfigBase {
- public static final String KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS = "kylin.job.yarn.app.rest.check.interval.seconds";
-
- public static final String HIVE_TABLE_LOCATION_PREFIX = "hive.table.location.";
-
- public static final String KYLIN_JOB_REMOTE_CLI_PASSWORD = "kylin.job.remote.cli.password";
-
- public static final String KYLIN_JOB_REMOTE_CLI_USERNAME = "kylin.job.remote.cli.username";
-
- public static final String KYLIN_JOB_REMOTE_CLI_HOSTNAME = "kylin.job.remote.cli.hostname";
-
- public static final String KYLIN_JOB_REMOTE_CLI_PORT = "kylin.job.remote.cli.port";
-
- public static final String KYLIN_JOB_REMOTE_CLI_WORKING_DIR = "kylin.job.remote.cli.working.dir";
-
- public static final String KYLIN_JOB_CMD_EXTRA_ARGS = "kylin.job.cmd.extra.args";
- /**
- * Toggle to indicate whether to use hive for table flattening. Default
- * true.
- */
- public static final String KYLIN_JOB_RUN_AS_REMOTE_CMD = "kylin.job.run.as.remote.cmd";
-
- public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO = "kylin.job.mapreduce.default.reduce.count.ratio";
-
- public static final String KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB = "kylin.job.mapreduce.default.reduce.input.mb";
-
- public static final String KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER = "kylin.job.mapreduce.max.reducer.number";
-
- public static final String KYLIN_JOB_JAR = "kylin.job.jar";
-
- public static final String KYLIN_JOB_JAR_SPARK = "kylin.job.jar.spark";
-
- public static final String COPROCESSOR_LOCAL_JAR = "kylin.coprocessor.local.jar";
-
- public static final String KYLIN_JOB_LOG_DIR = "kylin.job.log.dir";
-
- public static final String KYLIN_JOB_CUBING_IN_MEM = "kylin.job.cubing.inMem";
-
- public static final String KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT = "kylin.job.cubing.inMem.sampling.percent";
-
- public static final String KYLIN_HDFS_WORKING_DIR = "kylin.hdfs.working.dir";
-
- public static final String KYLIN_HBASE_CLUSTER_FS = "kylin.hbase.cluster.fs";
-
- public static final String HIVE_DATABASE_FOR_INTERMEDIATE_TABLE = "kylin.job.hive.database.for.intermediatetable";
-
- public static final String HIVE_PASSWORD = "hive.password";
-
- public static final String HIVE_USER = "hive.user";
+ private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
- public static final String HIVE_URL = "hive.url";
- /**
- * Kylin properties file
- */
+ /** Kylin properties file name */
public static final String KYLIN_CONF_PROPERTIES_FILE = "kylin.properties";
-
- public static final String MAIL_ENABLED = "mail.enabled";
-
- public static final String MAIL_HOST = "mail.host";
-
- public static final String MAIL_USERNAME = "mail.username";
-
- public static final String MAIL_PASSWORD = "mail.password";
-
- public static final String MAIL_SENDER = "mail.sender";
-
- public static final String KYLIN_HOME = "KYLIN_HOME";
public static final String KYLIN_CONF = "KYLIN_CONF";
- public static final String HBASE_REGION_CUT_SMALL = "kylin.hbase.region.cut.small";
- public static final String HBASE_REGION_CUT_MEDIUM = "kylin.hbase.region.cut.medium";
- public static final String HBASE_REGION_CUT_LARGE = "kylin.hbase.region.cut.large";
-
- public static final String SPARK_HOME = "kylin.spark.home";
- public static final String SPARK_MASTER = "kylin.spark.master";
-
- private static final Logger logger = LoggerFactory.getLogger(KylinConfig.class);
-
- public static final String VERSION = "${project.version}";
-
- public static final String HTABLE_DEFAULT_COMPRESSION_CODEC = "kylin.hbase.default.compression.codec";
-
// static cached instances
private static KylinConfig ENV_INSTANCE = null;
@@ -249,363 +149,6 @@ public class KylinConfig implements Serializable {
return config;
}
- // ============================================================================
-
- /**
- * Find config from environment. The Search process: 1. Check the
- * $KYLIN_CONF/kylin.properties 2. Check the $KYLIN_HOME/conf/kylin.properties
- */
- private static KylinConfig loadKylinConfig() {
- Log4jConfigurer.initLogger();
-
- InputStream is = getKylinPropertiesAsInputSteam();
- if (is == null) {
- throw new IllegalArgumentException("Failed to load kylin config");
- }
- KylinConfig config = new KylinConfig();
- config.reloadKylinConfig(is);
-
- return config;
- }
-
- private volatile Properties properties = new Properties();
-
- public CliCommandExecutor getCliCommandExecutor() throws IOException {
- CliCommandExecutor exec = new CliCommandExecutor();
- if (getRunAsRemoteCommand()) {
- exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
- }
- return exec;
- }
-
- // ============================================================================
-
- public String getStorageUrl() {
- return getOptional(KYLIN_STORAGE_URL);
- }
-
- public String getHiveUrl() {
- return getOptional(HIVE_URL, "");
- }
-
- public String getHiveUser() {
- return getOptional(HIVE_USER, "");
- }
-
- public String getHivePassword() {
- return getOptional(HIVE_PASSWORD, "");
- }
-
- public String getHdfsWorkingDirectory() {
- String root = getRequired(KYLIN_HDFS_WORKING_DIR);
- if (!root.endsWith("/")) {
- root += "/";
- }
- return root + getMetadataUrlPrefix() + "/";
- }
-
- public String getHBaseClusterFs() {
- return getOptional(KYLIN_HBASE_CLUSTER_FS, "");
- }
-
- public String getKylinJobLogDir() {
- return getOptional(KYLIN_JOB_LOG_DIR, "/tmp/kylin/logs");
- }
-
- public String getKylinJobJarPath() {
- final String jobJar = getOptional(KYLIN_JOB_JAR);
- if (StringUtils.isNotEmpty(jobJar)) {
- return jobJar;
- }
- String kylinHome = getKylinHome();
- if (StringUtils.isEmpty(kylinHome)) {
- return "";
- }
- return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
- }
-
- public String getKylinJobMRLibDir() {
- return getOptional("kylin.job.mr.lib.dir", "");
- }
-
- public String getKylinSparkJobJarPath() {
- final String jobJar = getOptional(KYLIN_JOB_JAR_SPARK);
- if (StringUtils.isNotEmpty(jobJar)) {
- return jobJar;
- }
- String kylinHome = getKylinHome();
- if (StringUtils.isEmpty(kylinHome)) {
- return "";
- }
- return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN);
- }
-
- public void overrideMRJobJarPath(String path) {
- logger.info("override " + KYLIN_JOB_JAR + " to " + path);
- System.setProperty(KYLIN_JOB_JAR, path);
- }
-
- public void overrideSparkJobJarPath(String path) {
- logger.info("override " + KYLIN_JOB_JAR_SPARK + " to " + path);
- System.setProperty(KYLIN_JOB_JAR_SPARK, path);
- }
-
- private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
- private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
- private static final Pattern SPARK_JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-engine-spark-(.+)\\.jar");
-
- public String getCoprocessorLocalJar() {
- final String coprocessorJar = getOptional(COPROCESSOR_LOCAL_JAR);
- if (StringUtils.isNotEmpty(coprocessorJar)) {
- return coprocessorJar;
- }
- String kylinHome = getKylinHome();
- if (StringUtils.isEmpty(kylinHome)) {
- throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
- }
- return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
- }
-
- private static String getFileName(String homePath, Pattern pattern) {
- File home = new File(homePath);
- SortedSet<String> files = Sets.newTreeSet();
- if (home.exists() && home.isDirectory()) {
- for (File file : home.listFiles()) {
- final Matcher matcher = pattern.matcher(file.getName());
- if (matcher.matches()) {
- files.add(file.getAbsolutePath());
- }
- }
- }
- if (files.isEmpty()) {
- throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
- } else {
- return files.last();
- }
- }
-
- public void overrideCoprocessorLocalJar(String path) {
- logger.info("override " + COPROCESSOR_LOCAL_JAR + " to " + path);
- System.setProperty(COPROCESSOR_LOCAL_JAR, path);
- }
-
- public double getDefaultHadoopJobReducerInputMB() {
- return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_INPUT_MB, "500"));
- }
-
- public double getDefaultHadoopJobReducerCountRatio() {
- return Double.parseDouble(getOptional(KYLIN_JOB_MAPREDUCE_DEFAULT_REDUCE_COUNT_RATIO, "1.0"));
- }
-
- public int getHadoopJobMaxReducerNumber() {
- return Integer.parseInt(getOptional(KYLIN_JOB_MAPREDUCE_MAX_REDUCER_NUMBER, "500"));
- }
-
- public boolean getRunAsRemoteCommand() {
- return Boolean.parseBoolean(getOptional(KYLIN_JOB_RUN_AS_REMOTE_CMD));
- }
-
- public int getRemoteHadoopCliPort() {
- return Integer.parseInt(getOptional(KYLIN_JOB_REMOTE_CLI_PORT, "22"));
- }
-
- public String getRemoteHadoopCliHostname() {
- return getOptional(KYLIN_JOB_REMOTE_CLI_HOSTNAME);
- }
-
- public String getRemoteHadoopCliUsername() {
- return getOptional(KYLIN_JOB_REMOTE_CLI_USERNAME);
- }
-
- public String getRemoteHadoopCliPassword() {
- return getOptional(KYLIN_JOB_REMOTE_CLI_PASSWORD);
- }
-
- public String getCliWorkingDir() {
- return getOptional(KYLIN_JOB_REMOTE_CLI_WORKING_DIR);
- }
-
- public String getMapReduceCmdExtraArgs() {
- return getOptional(KYLIN_JOB_CMD_EXTRA_ARGS);
- }
-
- public String getOverrideHiveTableLocation(String table) {
- return getOptional(HIVE_TABLE_LOCATION_PREFIX + table.toUpperCase());
- }
-
- public String getYarnStatusCheckUrl() {
- return getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_URL, null);
- }
-
- public int getYarnStatusCheckIntervalSeconds() {
- return Integer.parseInt(getOptional(KYLIN_JOB_YARN_APP_REST_CHECK_INTERVAL_SECONDS, "60"));
- }
-
- public int getMaxConcurrentJobLimit() {
- return Integer.parseInt(getOptional(KYLIN_JOB_CONCURRENT_MAX_LIMIT, "10"));
- }
-
- public String getTimeZone() {
- return getOptional(KYLIN_REST_TIMEZONE, "PST");
- }
-
- public String[] getRestServers() {
- return getOptionalStringArray(KYLIN_REST_SERVERS);
- }
-
- public String getAdminDls() {
- return getOptional("kylin.job.admin.dls", null);
- }
-
- public long getJobStepTimeout() {
- return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
- }
-
- public String getServerMode() {
- return this.getOptional("kylin.server.mode", "all");
- }
-
- public int getDictionaryMaxCardinality() {
- return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
- }
-
- public int getTableSnapshotMaxMB() {
- return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
- }
-
- public int getHBaseRegionCountMin() {
- return Integer.parseInt(getOptional("kylin.hbase.region.count.min", "1"));
- }
-
- public int getHBaseRegionCountMax() {
- return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
- }
-
- public int getScanThreshold() {
- return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
- }
-
- public boolean getQueryRunLocalCoprocessor() {
- return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
- }
-
- public Long getQueryDurationCacheThreshold() {
- return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
- }
-
- public Long getQueryScanCountCacheThreshold() {
- return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
- }
-
- public long getQueryMemBudget() {
- return Long.parseLong(this.getOptional("kylin.query.mem.budget", String.valueOf(3L * 1024 * 1024 * 1024)));
- }
-
- public boolean isQuerySecureEnabled() {
- return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
- }
-
- public boolean isQueryCacheEnabled() {
- return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
- }
-
- public int getHBaseKeyValueSize() {
- return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
- }
-
- public int getHBaseScanCacheRows() {
- return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024"));
- }
-
- public int getHBaseScanMaxResultSize() {
- return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
- }
-
- public boolean isCubingInMem() {
- return Boolean.parseBoolean(this.getOptional(KYLIN_JOB_CUBING_IN_MEM, "false"));
- }
-
- public int getCubingInMemSamplingPercent() {
- int percent = Integer.parseInt(this.getOptional(KYLIN_JOB_CUBING_IN_MEM_SAMPLING_PERCENT, "100"));
- percent = Math.max(percent, 1);
- percent = Math.min(percent, 100);
- return percent;
- }
-
- public String getHbaseDefaultCompressionCodec() {
- return getOptional(HTABLE_DEFAULT_COMPRESSION_CODEC, "");
- }
-
- public boolean isHiveKeepFlatTable() {
- return Boolean.parseBoolean(this.getOptional("kylin.hive.keep.flat.table", "false"));
- }
-
- private String getOptional(String prop) {
- final String property = System.getProperty(prop);
- return property != null ? property : properties.getProperty(prop);
- }
-
- private String[] getOptionalStringArray(String prop) {
- final String property = getOptional(prop);
- if (!StringUtils.isBlank(property)) {
- return property.split("\\s*,\\s*");
- } else {
- return new String[] {};
- }
- }
-
- private String getOptional(String prop, String dft) {
- final String property = System.getProperty(prop);
- return property != null ? property : properties.getProperty(prop, dft);
- }
-
- private String getRequired(String prop) {
- final String property = System.getProperty(prop);
- if (property != null) {
- return property;
- }
- String r = properties.getProperty(prop);
- if (StringUtils.isEmpty(r)) {
- throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
- }
- return r;
- }
-
- void reloadKylinConfig(InputStream is) {
- Properties newProperties = new Properties();
- try {
- newProperties.load(is);
- } catch (IOException e) {
- throw new RuntimeException("Cannot load kylin config.", e);
- } finally {
- IOUtils.closeQuietly(is);
- }
- this.properties = newProperties;
- }
-
- public void writeProperties(File file) throws IOException {
- FileOutputStream fos = null;
- try {
- fos = new FileOutputStream(file);
- properties.store(fos, file.getAbsolutePath());
- } finally {
- IOUtils.closeQuietly(fos);
- }
- }
-
- public static String getKylinHome() {
- String kylinHome = System.getenv(KYLIN_HOME);
- if (StringUtils.isEmpty(kylinHome)) {
- logger.warn("KYLIN_HOME was not set");
- return kylinHome;
- }
- return kylinHome;
- }
-
- public void printProperties() throws IOException {
- properties.list(System.out);
- }
-
private static File getKylinProperties() {
String kylinConfHome = System.getProperty(KYLIN_CONF);
if (!StringUtils.isEmpty(kylinConfHome)) {
@@ -679,68 +222,33 @@ public class KylinConfig implements Serializable {
return new File(path, KYLIN_CONF_PROPERTIES_FILE);
}
- public String getMetadataUrl() {
- return getOptional(KYLIN_METADATA_URL);
- }
-
- public String getMetadataUrlPrefix() {
- String hbaseMetadataUrl = getMetadataUrl();
- String defaultPrefix = "kylin_metadata";
+ /**
+ * Find config from environment. The Search process: 1. Check the
+ * $KYLIN_CONF/kylin.properties 2. Check the $KYLIN_HOME/conf/kylin.properties
+ */
+ private static KylinConfig loadKylinConfig() {
+ Log4jConfigurer.initLogger();
- if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
- int cut = hbaseMetadataUrl.indexOf('@');
- String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
- return tmp;
- } else {
- return defaultPrefix;
+ InputStream is = getKylinPropertiesAsInputSteam();
+ if (is == null) {
+ throw new IllegalArgumentException("Failed to load kylin config");
}
- }
-
- public void setMetadataUrl(String metadataUrl) {
- properties.setProperty(KYLIN_METADATA_URL, metadataUrl);
- }
-
- public void setStorageUrl(String storageUrl) {
- properties.setProperty(KYLIN_STORAGE_URL, storageUrl);
- }
-
- public String getHiveDatabaseForIntermediateTable() {
- return this.getOptional(HIVE_DATABASE_FOR_INTERMEDIATE_TABLE, "default");
- }
-
- public String getKylinOwner() {
- return this.getOptional(KYLIN_OWNER, "");
- }
-
- public void setRunAsRemoteCommand(String v) {
- properties.setProperty(KYLIN_JOB_RUN_AS_REMOTE_CMD, v);
- }
-
- public void setRemoteHadoopCliHostname(String v) {
- properties.setProperty(KYLIN_JOB_REMOTE_CLI_HOSTNAME, v);
- }
-
- public void setRemoteHadoopCliUsername(String v) {
- properties.setProperty(KYLIN_JOB_REMOTE_CLI_USERNAME, v);
- }
+ KylinConfig config = new KylinConfig();
+ config.reloadKylinConfig(is);
- public void setRemoteHadoopCliPassword(String v) {
- properties.setProperty(KYLIN_JOB_REMOTE_CLI_PASSWORD, v);
+ return config;
}
- public String getProperty(String key, String defaultValue) {
- return properties.getProperty(key, defaultValue);
- }
+ // ============================================================================
- /**
- * Set a new key:value into the kylin config.
- *
- * @param key
- * @param value
- */
- public void setProperty(String key, String value) {
- logger.info("Kylin Config was updated with " + key + " : " + value);
- properties.setProperty(key, value);
+ public void writeProperties(File file) throws IOException {
+ FileOutputStream fos = null;
+ try {
+ fos = new FileOutputStream(file);
+ getAllProperties().store(fos, file.getAbsolutePath());
+ } finally {
+ IOUtils.closeQuietly(fos);
+ }
}
public String getConfigAsString() throws IOException {
@@ -750,42 +258,12 @@ public class KylinConfig implements Serializable {
}
private void list(PrintWriter out) {
- for (Enumeration e = properties.keys() ; e.hasMoreElements() ;) {
- String key = (String)e.nextElement();
- String val = (String)properties.get(key);
+ Properties props = getAllProperties();
+ for (Enumeration<?> e = props.keys(); e.hasMoreElements();) {
+ String key = (String) e.nextElement();
+ String val = (String) props.get(key);
out.println(key + "=" + val);
}
}
- public String getSparkHome() {
- return properties.getProperty(SPARK_HOME);
- }
-
- public String getSparkMaster() {
- return properties.getProperty(SPARK_MASTER);
- }
-
- public int getHBaseRegionCut(String capacity) {
- String cut;
- switch (capacity) {
- case "SMALL":
- cut = getProperty(HBASE_REGION_CUT_SMALL, "10");
- break;
- case "MEDIUM":
- cut = getProperty(HBASE_REGION_CUT_MEDIUM, "20");
- break;
- case "LARGE":
- cut = getProperty(HBASE_REGION_CUT_LARGE, "100");
- break;
- default:
- throw new IllegalArgumentException("Capacity not recognized: " + capacity);
- }
-
- return Integer.valueOf(cut);
- }
-
- public String toString() {
- return getMetadataUrl();
- }
-
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
new file mode 100644
index 0000000..302a2db
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -0,0 +1,506 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.Properties;
+import java.util.SortedSet;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.util.CliCommandExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Sets;
+
+@SuppressWarnings("serial")
+/**
+ * An abstract class to encapsulate access to a set of 'properties'.
+ * Subclass can override methods in this class to extend the content of the 'properties',
+ * with some override values for example.
+ */
+public class KylinConfigBase implements Serializable {
+
+ private static final Logger logger = LoggerFactory.getLogger(KylinConfigBase.class);
+
+ /*
+ * DON'T DEFINE CONSTANTS FOR PROPERTY KEYS!
+ *
+ * For 1), no external need to access property keys, all accesses are by public methods.
+ * For 2), it's cumbersome to maintain constants at top and code at bottom.
+ * For 3), key literals usually appear only once.
+ */
+
+ public static String getKylinHome() {
+ String kylinHome = System.getenv("KYLIN_HOME");
+ if (StringUtils.isEmpty(kylinHome)) {
+ logger.warn("KYLIN_HOME was not set");
+ }
+ return kylinHome;
+ }
+
+ // ============================================================================
+
+ private volatile Properties properties = new Properties();
+
+ public String getOptional(String prop) {
+ return getOptional(prop, null);
+ }
+
+ public String getOptional(String prop, String dft) {
+ final String property = System.getProperty(prop);
+ return property != null ? property : properties.getProperty(prop, dft);
+ }
+
+ protected String[] getOptionalStringArray(String prop, String[] dft) {
+ final String property = getOptional(prop);
+ if (!StringUtils.isBlank(property)) {
+ return property.split("\\s*,\\s*");
+ } else {
+ return dft;
+ }
+ }
+
+ public String getRequired(String prop) {
+ String r = getOptional(prop);
+ if (StringUtils.isEmpty(r)) {
+ throw new IllegalArgumentException("missing '" + prop + "' in conf/kylin_instance.properties");
+ }
+ return r;
+ }
+
+ /**
+ * Use with care, properties should be read-only. This is for testing mostly.
+ */
+ public void setProperty(String key, String value) {
+ logger.info("Kylin Config was updated with " + key + " : " + value);
+ properties.setProperty(key, value);
+ }
+
+ protected Properties getAllProperties() {
+ return properties;
+ }
+
+ protected void reloadKylinConfig(InputStream is) {
+ Properties newProperties = new Properties();
+ try {
+ newProperties.load(is);
+ } catch (IOException e) {
+ throw new RuntimeException("Cannot load kylin config.", e);
+ } finally {
+ IOUtils.closeQuietly(is);
+ }
+ this.properties = newProperties;
+ }
+
+ // ============================================================================
+
+ public String getMetadataUrl() {
+ return getOptional("kylin.metadata.url");
+ }
+
+ public void setMetadataUrl(String metadataUrl) {
+ setProperty("kylin.metadata.url", metadataUrl);
+ }
+
+ public String getMetadataUrlPrefix() {
+ String hbaseMetadataUrl = getMetadataUrl();
+ String defaultPrefix = "kylin_metadata";
+
+ if (org.apache.commons.lang3.StringUtils.containsIgnoreCase(hbaseMetadataUrl, "@hbase")) {
+ int cut = hbaseMetadataUrl.indexOf('@');
+ String tmp = cut < 0 ? defaultPrefix : hbaseMetadataUrl.substring(0, cut);
+ return tmp;
+ } else {
+ return defaultPrefix;
+ }
+ }
+
+ public String getServerMode() {
+ return this.getOptional("kylin.server.mode", "all");
+ }
+
+ public String getStorageUrl() {
+ return getOptional("kylin.storage.url");
+ }
+
+ public void setStorageUrl(String storageUrl) {
+ setProperty("kylin.storage.url", storageUrl);
+ }
+
+ /** was for route to hive, not used any more */
+ @Deprecated
+ public String getHiveUrl() {
+ return getOptional("hive.url", "");
+ }
+
+ /** was for route to hive, not used any more */
+ @Deprecated
+ public String getHiveUser() {
+ return getOptional("hive.user", "");
+ }
+
+ /** was for route to hive, not used any more */
+ @Deprecated
+ public String getHivePassword() {
+ return getOptional("hive.password", "");
+ }
+
+ public String getHdfsWorkingDirectory() {
+ String root = getRequired("kylin.hdfs.working.dir");
+ if (!root.endsWith("/")) {
+ root += "/";
+ }
+ return root + getMetadataUrlPrefix() + "/";
+ }
+
+ public CliCommandExecutor getCliCommandExecutor() throws IOException {
+ CliCommandExecutor exec = new CliCommandExecutor();
+ if (getRunAsRemoteCommand()) {
+ exec.setRunAtRemote(getRemoteHadoopCliHostname(), getRemoteHadoopCliPort(), getRemoteHadoopCliUsername(), getRemoteHadoopCliPassword());
+ }
+ return exec;
+ }
+
+ public String getHBaseClusterFs() {
+ return getOptional("kylin.hbase.cluster.fs", "");
+ }
+
+ public String getKylinJobLogDir() {
+ return getOptional("kylin.job.log.dir", "/tmp/kylin/logs");
+ }
+
+ public String getKylinJobJarPath() {
+ final String jobJar = getOptional("kylin.job.jar");
+ if (StringUtils.isNotEmpty(jobJar)) {
+ return jobJar;
+ }
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome)) {
+ return "";
+ }
+ return getFileName(kylinHome + File.separator + "lib", JOB_JAR_NAME_PATTERN);
+ }
+
+ public void overrideMRJobJarPath(String path) {
+ logger.info("override " + "kylin.job.jar" + " to " + path);
+ System.setProperty("kylin.job.jar", path);
+ }
+
+ public String getKylinJobMRLibDir() {
+ return getOptional("kylin.job.mr.lib.dir", "");
+ }
+
+ public String getKylinSparkJobJarPath() {
+ final String jobJar = getOptional("kylin.job.jar.spark");
+ if (StringUtils.isNotEmpty(jobJar)) {
+ return jobJar;
+ }
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome)) {
+ return "";
+ }
+ return getFileName(kylinHome + File.separator + "lib", SPARK_JOB_JAR_NAME_PATTERN);
+ }
+
+ public void overrideSparkJobJarPath(String path) {
+ logger.info("override " + "kylin.job.jar.spark" + " to " + path);
+ System.setProperty("kylin.job.jar.spark", path);
+ }
+
+ private static final Pattern COPROCESSOR_JAR_NAME_PATTERN = Pattern.compile("kylin-coprocessor-(.+)\\.jar");
+ private static final Pattern JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-job-(.+)\\.jar");
+ private static final Pattern SPARK_JOB_JAR_NAME_PATTERN = Pattern.compile("kylin-engine-spark-(.+)\\.jar");
+
+ public String getCoprocessorLocalJar() {
+ final String coprocessorJar = getOptional("kylin.coprocessor.local.jar");
+ if (StringUtils.isNotEmpty(coprocessorJar)) {
+ return coprocessorJar;
+ }
+ String kylinHome = getKylinHome();
+ if (StringUtils.isEmpty(kylinHome)) {
+ throw new RuntimeException("getCoprocessorLocalJar needs KYLIN_HOME");
+ }
+ return getFileName(kylinHome + File.separator + "lib", COPROCESSOR_JAR_NAME_PATTERN);
+ }
+
+ public void overrideCoprocessorLocalJar(String path) {
+ logger.info("override " + "kylin.coprocessor.local.jar" + " to " + path);
+ System.setProperty("kylin.coprocessor.local.jar", path);
+ }
+
+ private static String getFileName(String homePath, Pattern pattern) {
+ File home = new File(homePath);
+ SortedSet<String> files = Sets.newTreeSet();
+ if (home.exists() && home.isDirectory()) {
+ for (File file : home.listFiles()) {
+ final Matcher matcher = pattern.matcher(file.getName());
+ if (matcher.matches()) {
+ files.add(file.getAbsolutePath());
+ }
+ }
+ }
+ if (files.isEmpty()) {
+ throw new RuntimeException("cannot find " + pattern.toString() + " in " + homePath);
+ } else {
+ return files.last();
+ }
+ }
+
+ public double getDefaultHadoopJobReducerInputMB() {
+ return Double.parseDouble(getOptional("kylin.job.mapreduce.default.reduce.input.mb", "500"));
+ }
+
+ public double getDefaultHadoopJobReducerCountRatio() {
+ return Double.parseDouble(getOptional("kylin.job.mapreduce.default.reduce.count.ratio", "1.0"));
+ }
+
+ public int getHadoopJobMaxReducerNumber() {
+ return Integer.parseInt(getOptional("kylin.job.mapreduce.max.reducer.number", "500"));
+ }
+
+ public boolean getRunAsRemoteCommand() {
+ return Boolean.parseBoolean(getOptional("kylin.job.run.as.remote.cmd"));
+ }
+
+ public void setRunAsRemoteCommand(String v) {
+ setProperty("kylin.job.run.as.remote.cmd", v);
+ }
+
+ public int getRemoteHadoopCliPort() {
+ return Integer.parseInt(getOptional("kylin.job.remote.cli.port", "22"));
+ }
+
+ public String getRemoteHadoopCliHostname() {
+ return getOptional("kylin.job.remote.cli.hostname");
+ }
+
+ public void setRemoteHadoopCliHostname(String v) {
+ setProperty("kylin.job.remote.cli.hostname", v);
+ }
+
+ public String getRemoteHadoopCliUsername() {
+ return getOptional("kylin.job.remote.cli.username");
+ }
+
+ public void setRemoteHadoopCliUsername(String v) {
+ setProperty("kylin.job.remote.cli.username", v);
+ }
+
+ public String getRemoteHadoopCliPassword() {
+ return getOptional("kylin.job.remote.cli.password");
+ }
+
+ public void setRemoteHadoopCliPassword(String v) {
+ setProperty("kylin.job.remote.cli.password", v);
+ }
+
+ public String getCliWorkingDir() {
+ return getOptional("kylin.job.remote.cli.working.dir");
+ }
+
+ public String getMapReduceCmdExtraArgs() {
+ return getOptional("kylin.job.cmd.extra.args");
+ }
+
+ public String getOverrideHiveTableLocation(String table) {
+ return getOptional("hive.table.location." + table.toUpperCase());
+ }
+
+ public String getYarnStatusCheckUrl() {
+ return getOptional("kylin.job.yarn.app.rest.check.status.url", null);
+ }
+
+ public int getYarnStatusCheckIntervalSeconds() {
+ return Integer.parseInt(getOptional("kylin.job.yarn.app.rest.check.interval.seconds", "60"));
+ }
+
+ public int getMaxConcurrentJobLimit() {
+ return Integer.parseInt(getOptional("kylin.job.concurrent.max.limit", "10"));
+ }
+
+ public String getTimeZone() {
+ return getOptional("kylin.rest.timezone", "PST");
+ }
+
+ public String[] getRestServers() {
+ return getOptionalStringArray("kylin.rest.servers", new String[0]);
+ }
+
+ public String getAdminDls() {
+ return getOptional("kylin.job.admin.dls", null);
+ }
+
+ public long getJobStepTimeout() {
+ return Long.parseLong(getOptional("kylin.job.step.timeout", String.valueOf(2 * 60 * 60)));
+ }
+
+ public String getCubeAlgorithm() {
+ return getOptional("kylin.cube.algorithm", "auto");
+ }
+
+ public double getCubeAlgorithmAutoThreshold() {
+ return Double.parseDouble(getOptional("kylin.cube.algorithm.auto.threshold", "8"));
+ }
+
+ public int getDictionaryMaxCardinality() {
+ return Integer.parseInt(getOptional("kylin.dictionary.max.cardinality", "5000000"));
+ }
+
+ public int getTableSnapshotMaxMB() {
+ return Integer.parseInt(getOptional("kylin.table.snapshot.max_mb", "300"));
+ }
+
+ public int getHBaseRegionCut(String capacity) {
+ String cut;
+ switch (capacity) {
+ case "SMALL":
+ cut = getOptional("kylin.hbase.region.cut.small", "10");
+ break;
+ case "MEDIUM":
+ cut = getOptional("kylin.hbase.region.cut.medium", "20");
+ break;
+ case "LARGE":
+ cut = getOptional("kylin.hbase.region.cut.large", "100");
+ break;
+ default:
+ throw new IllegalArgumentException("Capacity not recognized: " + capacity);
+ }
+
+ return Integer.valueOf(cut);
+ }
+
+ public int getHBaseRegionCountMin() {
+ return Integer.parseInt(getOptional("kylin.hbase.region.count.min", "1"));
+ }
+
+ public int getHBaseRegionCountMax() {
+ return Integer.parseInt(getOptional("kylin.hbase.region.count.max", "500"));
+ }
+
+ public int getScanThreshold() {
+ return Integer.parseInt(getOptional("kylin.query.scan.threshold", "10000000"));
+ }
+
+ public boolean getQueryRunLocalCoprocessor() {
+ return Boolean.parseBoolean(getOptional("kylin.query.run.local.coprocessor", "false"));
+ }
+
+ public Long getQueryDurationCacheThreshold() {
+ return Long.parseLong(this.getOptional("kylin.query.cache.threshold.duration", String.valueOf(2000)));
+ }
+
+ public Long getQueryScanCountCacheThreshold() {
+ return Long.parseLong(this.getOptional("kylin.query.cache.threshold.scancount", String.valueOf(10 * 1024)));
+ }
+
+ public long getQueryMemBudget() {
+ return Long.parseLong(this.getOptional("kylin.query.mem.budget", String.valueOf(3L * 1024 * 1024 * 1024)));
+ }
+
+ public boolean isQuerySecureEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.security.enabled", "false"));
+ }
+
+ public boolean isQueryCacheEnabled() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.cache.enabled", "true"));
+ }
+
+ public boolean isQueryIgnoreUnknownFunction() {
+ return Boolean.parseBoolean(this.getOptional("kylin.query.ignore_unknown_function", "false"));
+ }
+
+ public int getHBaseKeyValueSize() {
+ return Integer.parseInt(this.getOptional("kylin.hbase.client.keyvalue.maxsize", "10485760"));
+ }
+
+ public int getHBaseScanCacheRows() {
+ return Integer.parseInt(this.getOptional("kylin.hbase.scan.cache_rows", "1024"));
+ }
+
+ public int getHBaseScanMaxResultSize() {
+ return Integer.parseInt(this.getOptional("kylin.hbase.scan.max_result_size", "" + (5 * 1024 * 1024))); // 5 MB
+ }
+
+ public int getCubingInMemSamplingPercent() {
+ int percent = Integer.parseInt(this.getOptional("kylin.job.cubing.inMem.sampling.percent", "100"));
+ percent = Math.max(percent, 1);
+ percent = Math.min(percent, 100);
+ return percent;
+ }
+
+ public String getHbaseDefaultCompressionCodec() {
+ return getOptional("kylin.hbase.default.compression.codec", "");
+ }
+
+ public boolean isHiveKeepFlatTable() {
+ return Boolean.parseBoolean(this.getOptional("kylin.hive.keep.flat.table", "false"));
+ }
+
+ public String getHiveDatabaseForIntermediateTable() {
+ return this.getOptional("kylin.job.hive.database.for.intermediatetable", "default");
+ }
+
+ public String getKylinOwner() {
+ return this.getOptional("kylin.owner", "");
+ }
+
+ public String getSparkHome() {
+ return getRequired("kylin.spark.home");
+ }
+
+ public String getSparkMaster() {
+ return getRequired("kylin.spark.master");
+ }
+
+ public boolean isMailEnabled() {
+ return Boolean.parseBoolean(getOptional("mail.enabled", "false"));
+ }
+
+ public void setMailEnabled(boolean enable) {
+ setProperty("mail.enabled", "" + enable);
+ }
+
+ public String getMailHost() {
+ return getOptional("mail.host", "");
+ }
+
+ public String getMailUsername() {
+ return getOptional("mail.username", "");
+ }
+
+ public String getMailPassword() {
+ return getOptional("mail.password", "");
+ }
+
+ public String getMailSender() {
+ return getOptional("mail.sender", "");
+ }
+
+ public String toString() {
+ return getMetadataUrl();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
new file mode 100644
index 0000000..8af35b6
--- /dev/null
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigExt.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.common;
+
+public class KylinConfigExt {
+
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/MailService.java b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
index 3b0a4e1..9a4ec64 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/MailService.java
@@ -42,7 +42,7 @@ public class MailService {
private static final Log logger = LogFactory.getLog(MailService.class);
public MailService(KylinConfig config) {
- this("true".equalsIgnoreCase(config.getProperty(KylinConfig.MAIL_ENABLED, "false")), config.getProperty(KylinConfig.MAIL_HOST, ""), config.getProperty(KylinConfig.MAIL_USERNAME, ""), config.getProperty(KylinConfig.MAIL_PASSWORD, ""), config.getProperty(KylinConfig.MAIL_SENDER, ""));
+ this(config.isMailEnabled(), config.getMailHost(), config.getMailUsername(), config.getMailPassword(), config.getMailSender());
}
private MailService(boolean enabled, String host, String username, String password, String sender) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
----------------------------------------------------------------------
diff --git a/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java b/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
index bc4d7cf..df06221 100644
--- a/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
+++ b/core-common/src/test/java/org/apache/kylin/common/util/MailServiceTest.java
@@ -51,8 +51,8 @@ public class MailServiceTest extends LocalFileMetadataTestCase {
boolean sent = sendTestEmail(mailservice);
assert sent;
- // set mail.enabled=false, and run again, this time should be no mail delviered
- config.setProperty(KylinConfig.MAIL_ENABLED, "false");
+ // set mail.enabled=false, and run again, this time should be no mail delivered
+ config.setMailEnabled(false);
mailservice = new MailService(config);
sent = sendTestEmail(mailservice);
assert !sent;
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
index 2208136..41c1f41 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeInstance.java
@@ -297,16 +297,6 @@ public class CubeInstance extends RootPersistentEntity implements IRealization,
return result;
}
- public List<CubeSegment> getSegment(SegmentStatusEnum status) {
- List<CubeSegment> result = Lists.newArrayList();
- for (CubeSegment segment : segments) {
- if (segment.getStatus() == status) {
- result.add(segment);
- }
- }
- return result;
- }
-
public CubeSegment getSegment(String name, SegmentStatusEnum status) {
for (CubeSegment segment : segments) {
if ((null != segment.getName() && segment.getName().equals(name)) && segment.getStatus() == status) {
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4592b15..c5ea962 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -270,7 +270,7 @@ public class CubeManager implements IRealizationProvider {
}
private boolean validateReadySegments(CubeInstance cube) {
- final List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
+ final List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
if (readySegments.size() == 0) {
return true;
}
@@ -474,7 +474,7 @@ public class CubeManager implements IRealizationProvider {
}
private Pair<Long, Long> alignMergeRange(CubeInstance cube, long startDate, long endDate) {
- List<CubeSegment> readySegments = cube.getSegment(SegmentStatusEnum.READY);
+ List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY);
if (readySegments.isEmpty()) {
throw new IllegalStateException("there are no segments in ready state");
}
@@ -609,7 +609,7 @@ public class CubeManager implements IRealizationProvider {
return null;
}
- List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegment(SegmentStatusEnum.READY));
+ List<CubeSegment> readySegments = Lists.newArrayList(cube.getSegments(SegmentStatusEnum.READY));
if (readySegments.size() == 0) {
logger.debug("Cube " + cube.getName() + " has no ready segment to merge");
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index e78dea5..6f3dc00 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -26,6 +26,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
@@ -124,6 +125,8 @@ public class CubeDesc extends RootPersistentEntity {
private int engineType = IEngineAware.ID_MR_V1;
@JsonProperty("storage_type")
private int storageType = IStorageAware.ID_HBASE;
+ @JsonProperty("override_kylin_properties")
+ private LinkedHashMap<String, String> overrideKylinProps = new LinkedHashMap<String, String>();
private Map<String, Map<String, TblColRef>> columnMap = new HashMap<String, Map<String, TblColRef>>();
private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
index 6d1d39b..c1f55d1 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/CubeSegmentsTest.java
@@ -65,7 +65,7 @@ public class CubeSegmentsTest extends LocalFileMetadataTestCase {
// assert one ready segment
assertEquals(1, cube.getSegments().size());
- CubeSegment seg = cube.getSegment(SegmentStatusEnum.READY).get(0);
+ CubeSegment seg = cube.getSegments(SegmentStatusEnum.READY).get(0);
assertEquals(SegmentStatusEnum.READY, seg.getStatus());
// append again, for non-partitioned cube, it becomes a full refresh
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
----------------------------------------------------------------------
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index 7ba328d..18d3193 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -269,6 +269,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected final void addExtraInfo(String key, String value) {
executableManager.addJobInfo(getId(), key, value);
}
+
+ protected final Map<String, String> getExtraInfo() {
+ return executableManager.getOutput(getId()).getExtra();
+ }
public final void setStartTime(long time) {
addExtraInfo(START_TIME, time + "");
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index f8fbc33..86439a8 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -19,10 +19,13 @@
package org.apache.kylin.engine.mr;
import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.RowKeyDesc;
import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BaseCuboidJob;
import org.apache.kylin.engine.mr.steps.InMemCuboidJob;
+import org.apache.kylin.engine.mr.steps.NDCuboidJob;
import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
@@ -37,14 +40,15 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
public BatchCubingJobBuilder2(CubeSegment newSegment, String submitter) {
super(newSegment, submitter);
this.inputSide = MRUtil.getBatchCubingInputSide(seg);
- this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment)seg);
+ this.outputSide = MRUtil.getBatchCubingOutputSide2((CubeSegment) seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to BUILD segment " + seg);
-
- final CubingJob result = CubingJob.createBuildJob((CubeSegment)seg, submitter, config);
+
+ final CubingJob result = CubingJob.createBuildJob((CubeSegment) seg, submitter, config);
final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
// Phase 1: Create Flat Table
inputSide.addStepPhase1_CreateFlatTable(result);
@@ -56,8 +60,9 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
outputSide.addStepPhase2_BuildDictionary(result);
// Phase 3: Build Cube
- result.addTask(createInMemCubingStep(jobId));
- outputSide.addStepPhase3_BuildCube(result);
+ addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, only selected algorithm will execute
+ result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); // inmem cubing, only selected algorithm will execute
+ outputSide.addStepPhase3_BuildCube(result, cuboidRootPath);
// Phase 4: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterBuildStep(jobId));
@@ -67,28 +72,44 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return result;
}
+ private void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) {
+ RowKeyDesc rowKeyDesc = ((CubeSegment) seg).getCubeDesc().getRowkey();
+ final int groupRowkeyColumnsCount = ((CubeSegment) seg).getCubeDesc().getBuildLevel();
+ final int totalRowkeyColumnsCount = rowKeyDesc.getRowKeyColumns().length;
+ final String[] cuboidOutputTempPath = getCuboidOutputPaths(cuboidRootPath, totalRowkeyColumnsCount, groupRowkeyColumnsCount);
+ // base cuboid step
+ result.addTask(createBaseCuboidStep(cuboidOutputTempPath, jobId));
+ // n dim cuboid steps
+ for (int i = 1; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnsCount - i;
+ result.addTask(createNDimensionCuboidStep(cuboidOutputTempPath, dimNum, totalRowkeyColumnsCount, jobId));
+ }
+ }
+
private SaveStatisticsStep createSaveStatisticsStep(String jobId) {
SaveStatisticsStep result = new SaveStatisticsStep();
result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
result.setCubeName(seg.getRealization().getName());
result.setSegmentId(seg.getUuid());
result.setStatisticsPath(getStatisticsPath(jobId));
+ result.setCubingJobId(jobId);
return result;
}
- private MapReduceExecutable createInMemCubingStep(String jobId) {
+ private MapReduceExecutable createInMemCubingStep(String jobId, String cuboidRootPath) {
// base cuboid job
MapReduceExecutable cubeStep = new MapReduceExecutable();
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
cubeStep.setName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE);
appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "output", cuboidRootPath);
appendExecCmdParameters(cmd, "jobname", "Kylin_Cube_Builder_" + seg.getRealization().getName());
- appendExecCmdParameters(cmd, "jobflowid", jobId);
+ appendExecCmdParameters(cmd, "cubingJobId", jobId);
cubeStep.setMapReduceParams(cmd.toString());
cubeStep.setMapReduceJobClass(InMemCuboidJob.class);
@@ -96,4 +117,61 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport {
return cubeStep;
}
+ private MapReduceExecutable createBaseCuboidStep(String[] cuboidOutputTempPath, String jobId) {
+ // base cuboid job
+ MapReduceExecutable baseCuboidStep = new MapReduceExecutable();
+
+ StringBuilder cmd = new StringBuilder();
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+
+ baseCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_BASE_CUBOID);
+
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", "FLAT_TABLE"); // marks flat table input
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[0]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "level", "0");
+ appendExecCmdParameters(cmd, "cubingJobId", jobId);
+
+ baseCuboidStep.setMapReduceParams(cmd.toString());
+ baseCuboidStep.setMapReduceJobClass(BaseCuboidJob.class);
+ baseCuboidStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + CubingJob.SOURCE_SIZE_BYTES);
+ return baseCuboidStep;
+ }
+
+ private MapReduceExecutable createNDimensionCuboidStep(String[] cuboidOutputTempPath, int dimNum, int totalRowkeyColumnCount, String jobId) {
+ // ND cuboid job
+ MapReduceExecutable ndCuboidStep = new MapReduceExecutable();
+
+ ndCuboidStep.setName(ExecutableConstants.STEP_NAME_BUILD_N_D_CUBOID + " : " + dimNum + "-Dimension");
+ StringBuilder cmd = new StringBuilder();
+
+ appendMapReduceParameters(cmd, ((CubeSegment) seg).getCubeDesc().getModel());
+ appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendExecCmdParameters(cmd, "segmentname", seg.getName());
+ appendExecCmdParameters(cmd, "input", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum - 1]);
+ appendExecCmdParameters(cmd, "output", cuboidOutputTempPath[totalRowkeyColumnCount - dimNum]);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_ND-Cuboid_Builder_" + seg.getRealization().getName() + "_Step");
+ appendExecCmdParameters(cmd, "level", "" + (totalRowkeyColumnCount - dimNum));
+ appendExecCmdParameters(cmd, "cubingJobId", jobId);
+
+ ndCuboidStep.setMapReduceParams(cmd.toString());
+ ndCuboidStep.setMapReduceJobClass(NDCuboidJob.class);
+ return ndCuboidStep;
+ }
+
+ private String[] getCuboidOutputPaths(String cuboidRootPath, int totalRowkeyColumnCount, int groupRowkeyColumnsCount) {
+ String[] paths = new String[groupRowkeyColumnsCount + 1];
+ for (int i = 0; i <= groupRowkeyColumnsCount; i++) {
+ int dimNum = totalRowkeyColumnCount - i;
+ if (dimNum == totalRowkeyColumnCount) {
+ paths[i] = cuboidRootPath + "base_cuboid";
+ } else {
+ paths[i] = cuboidRootPath + dimNum + "d_cuboid";
+ }
+ }
+ return paths;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index 48a717f..008d489 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -23,7 +23,7 @@ import java.util.List;
import org.apache.kylin.common.util.StringUtil;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.MergeCuboidFromStorageJob;
+import org.apache.kylin.engine.mr.steps.MergeCuboidJob;
import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
import org.apache.kylin.job.constant.ExecutableConstants;
import org.slf4j.Logger;
@@ -39,23 +39,24 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
public BatchMergeJobBuilder2(CubeSegment mergeSegment, String submitter) {
super(mergeSegment, submitter);
- this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment)seg);
+ this.outputSide = MRUtil.getBatchMergeOutputSide2((CubeSegment) seg);
}
public CubingJob build() {
logger.info("MR_V2 new job to MERGE segment " + seg);
-
- final CubeSegment cubeSegment = (CubeSegment)seg;
+
+ final CubeSegment cubeSegment = (CubeSegment) seg;
final CubingJob result = CubingJob.createMergeJob(cubeSegment, submitter, config);
final String jobId = result.getId();
+ final String cuboidRootPath = getCuboidRootPath(jobId);
final List<CubeSegment> mergingSegments = cubeSegment.getCubeInstance().getMergingSegments(cubeSegment);
Preconditions.checkState(mergingSegments.size() > 1, "there should be more than 2 segments to merge");
final List<String> mergingSegmentIds = Lists.newArrayList();
- final List<String> mergingHTables = Lists.newArrayList();
+ final List<String> mergingCuboidPaths = Lists.newArrayList();
for (CubeSegment merging : mergingSegments) {
mergingSegmentIds.add(merging.getUuid());
- mergingHTables.add(merging.getStorageLocationIdentifier());
+ mergingCuboidPaths.add(getCuboidRootPath(merging) + "*");
}
// Phase 1: Merge Dictionary
@@ -63,10 +64,10 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
result.addTask(createMergeStatisticsStep(cubeSegment, mergingSegmentIds, getStatisticsPath(jobId)));
outputSide.addStepPhase1_MergeDictionary(result);
- // Phase 2: Merge Cube
- String formattedTables = StringUtil.join(mergingHTables, ",");
- result.addTask(createMergeCuboidDataFromStorageStep(formattedTables, jobId));
- outputSide.addStepPhase2_BuildCube(result);
+ // Phase 2: Merge Cube Files
+ String formattedPath = StringUtil.join(mergingCuboidPaths, ",");
+ result.addTask(createMergeCuboidDataStep(cubeSegment, formattedPath, cuboidRootPath));
+ outputSide.addStepPhase2_BuildCube(result, cuboidRootPath);
// Phase 3: Update Metadata & Cleanup
result.addTask(createUpdateCubeInfoAfterMergeStep(mergingSegmentIds, jobId));
@@ -85,20 +86,20 @@ public class BatchMergeJobBuilder2 extends JobBuilderSupport {
return result;
}
- private MapReduceExecutable createMergeCuboidDataFromStorageStep(String inputTableNames, String jobId) {
+ private MapReduceExecutable createMergeCuboidDataStep(CubeSegment seg, String inputPath, String outputPath) {
MapReduceExecutable mergeCuboidDataStep = new MapReduceExecutable();
mergeCuboidDataStep.setName(ExecutableConstants.STEP_NAME_MERGE_CUBOID);
StringBuilder cmd = new StringBuilder();
- appendMapReduceParameters(cmd, ((CubeSegment)seg).getCubeDesc().getModel());
- appendExecCmdParameters(cmd, "cubename", seg.getRealization().getName());
+ appendMapReduceParameters(cmd, seg.getRealization().getDataModelDesc());
+ appendExecCmdParameters(cmd, "cubename", seg.getCubeInstance().getName());
appendExecCmdParameters(cmd, "segmentname", seg.getName());
- appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getRealization().getName() + "_Step");
- appendExecCmdParameters(cmd, "jobflowid", jobId);
+ appendExecCmdParameters(cmd, "input", inputPath);
+ appendExecCmdParameters(cmd, "output", outputPath);
+ appendExecCmdParameters(cmd, "jobname", "Kylin_Merge_Cuboid_" + seg.getCubeInstance().getName() + "_Step");
mergeCuboidDataStep.setMapReduceParams(cmd.toString());
- mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidFromStorageJob.class);
- mergeCuboidDataStep.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ mergeCuboidDataStep.setMapReduceJobClass(MergeCuboidJob.class);
return mergeCuboidDataStep;
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
index bee030f..32e0f21 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java
@@ -46,6 +46,10 @@ import org.apache.kylin.job.execution.Output;
/**
*/
public class CubingJob extends DefaultChainedExecutable {
+
+ public static enum AlgorithmEnum {
+ LAYER, INMEM
+ }
// KEYS of Output.extraInfo map, info passed across job steps
public static final String SOURCE_RECORD_COUNT = "sourceRecordCount";
@@ -169,7 +173,24 @@ public class CubingJob extends DefaultChainedExecutable {
public void setMapReduceWaitTime(long t) {
addExtraInfo(MAP_REDUCE_WAIT_TIME, t + "");
}
+
+ public void setAlgorithm(AlgorithmEnum alg) {
+ addExtraInfo("algorithm", alg.name());
+ }
+
+ public AlgorithmEnum getAlgorithm() {
+ String alg = getExtraInfo().get("algorithm");
+ return alg == null ? null : AlgorithmEnum.valueOf(alg);
+ }
+ public boolean isLayerCubing() {
+ return AlgorithmEnum.LAYER == getAlgorithm();
+ }
+
+ public boolean isInMemCubing() {
+ return AlgorithmEnum.INMEM == getAlgorithm();
+ }
+
public long findSourceRecordCount() {
return Long.parseLong(findExtraInfo(SOURCE_RECORD_COUNT, "0"));
}
@@ -206,5 +227,5 @@ public class CubingJob extends DefaultChainedExecutable {
}
return dft;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
index 3ad51c5..844eb07 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMROutput2.java
@@ -1,13 +1,23 @@
-package org.apache.kylin.engine.mr;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
-import java.io.IOException;
+package org.apache.kylin.engine.mr;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.io.WritableComparable;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.Reducer;
-import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.job.execution.DefaultChainedExecutable;
@@ -17,93 +27,59 @@ public interface IMROutput2 {
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(CubeSegment seg);
/**
- * Participate the batch cubing flow as the output side.
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 3.
*
* - Phase 1: Create Flat Table
* - Phase 2: Build Dictionary
- * - Phase 3: Build Cube (with StorageOutputFormat)
+ * - Phase 3: Build Cube
* - Phase 4: Update Metadata & Cleanup
*/
public interface IMRBatchCubingOutputSide2 {
- /** Return an output format for Phase 3: Build Cube MR */
- public IMRStorageOutputFormat getStorageOutputFormat();
-
/** Add step that executes after build dictionary and before build cube. */
public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow);
- /** Add step that executes after build cube. */
- public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow);
-
- /** Add step that does any necessary clean up. */
- public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
- }
-
- public IMRBatchMergeInputSide2 getBatchMergeInputSide(CubeSegment seg);
-
- public interface IMRBatchMergeInputSide2 {
- public IMRStorageInputFormat getStorageInputFormat();
- }
-
- /** Read in a cube as input of merge. Configure the input file format of mapper. */
- @SuppressWarnings("rawtypes")
- public interface IMRStorageInputFormat {
-
- /** Configure MR mapper class and input file format. */
- public void configureInput(Class<? extends Mapper> mapperClz, Class<? extends WritableComparable> outputKeyClz, Class<? extends Writable> outputValueClz, Job job) throws IOException;
-
- /** Given a mapper context, figure out which segment the mapper reads from. */
- public CubeSegment findSourceSegment(Mapper.Context context) throws IOException;
-
/**
- * Read in a row of cuboid. Given the input KV, de-serialize back cuboid ID, dimensions, and measures.
+ * Add step that saves cuboids from HDFS to storage.
*
- * @return <code>ByteArrayWritable</code> is the cuboid ID (8 bytes) + dimension values in dictionary encoding
- * <code>Object[]</code> is the measure values in order of <code>CubeDesc.getMeasures()</code>
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
*/
- public Pair<ByteArrayWritable, Object[]> parseMapperInput(Object inKey, Object inValue);
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
+
+ /** Add step that does any necessary clean up. */
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
}
/** Return a helper to participate in batch merge job flow. */
public IMRBatchMergeOutputSide2 getBatchMergeOutputSide(CubeSegment seg);
/**
- * Participate the batch merge flow as the output side.
+ * Participate the batch cubing flow as the output side. Responsible for saving
+ * the cuboid output to storage at the end of Phase 2.
*
* - Phase 1: Merge Dictionary
- * - Phase 2: Merge Cube (with StorageInputFormat & StorageOutputFormat)
+ * - Phase 2: Merge Cube
* - Phase 3: Update Metadata & Cleanup
*/
public interface IMRBatchMergeOutputSide2 {
- /** Return an input format for Phase 2: Merge Cube MR */
- public IMRStorageOutputFormat getStorageOutputFormat();
-
/** Add step that executes after merge dictionary and before merge cube. */
public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow);
- /** Add step that executes after merge cube. */
- public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow);
+ /**
+ * Add step that saves cuboid output from HDFS to storage.
+ *
+ * The cuboid output is a directory of sequence files, where key is CUBOID+D1+D2+..+Dn,
+ * value is M1+M2+..+Mm. CUBOID is 8 bytes cuboid ID; Dx is dimension value with
+ * dictionary encoding; Mx is measure value serialization form.
+ */
+ public void addStepPhase2_BuildCube(DefaultChainedExecutable jobFlow, String cuboidRootPath);
/** Add step that does any necessary clean up. */
public void addStepPhase3_Cleanup(DefaultChainedExecutable jobFlow);
}
- /** Write out a cube. Configure the output file format of reducer and do the actual K-V output. */
- @SuppressWarnings("rawtypes")
- public interface IMRStorageOutputFormat {
-
- /** Configure MR reducer class and output file format. */
- public void configureOutput(Class<? extends Reducer> reducer, String jobFlowId, Job job) throws IOException;
-
- /**
- * Write out a row of cuboid. Given the cuboid ID, dimensions, and measures, serialize in whatever
- * way and output to reducer context.
- *
- * @param key The cuboid ID (8 bytes) + dimension values in dictionary encoding
- * @param value The measure values in order of <code>CubeDesc.getMeasures()</code>
- * @param context The reducer context output goes to
- */
- public void doReducerOutput(ByteArrayWritable key, Object[] value, Reducer.Context context) throws IOException, InterruptedException;
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 55fa9e2..41c8b6b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -7,12 +7,10 @@ import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchCubingOutputSide;
import org.apache.kylin.engine.mr.IMROutput.IMRBatchMergeOutputSide;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
-import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeInputSide2;
import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
import org.apache.kylin.invertedindex.IISegment;
import org.apache.kylin.metadata.MetadataManager;
import org.apache.kylin.metadata.model.TableDesc;
-import org.apache.kylin.metadata.realization.IRealization;
import org.apache.kylin.metadata.realization.IRealizationSegment;
import org.apache.kylin.source.SourceFactory;
import org.apache.kylin.storage.StorageFactory;
@@ -47,10 +45,6 @@ public class MRUtil {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchCubingOutputSide(seg);
}
- public static IMRBatchMergeInputSide2 getBatchMergeInputSide2(CubeSegment seg) {
- return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeInputSide(seg);
- }
-
public static IMRBatchMergeOutputSide2 getBatchMergeOutputSide2(CubeSegment seg) {
return StorageFactory.createEngineAdapter(seg, IMROutput2.class).getBatchMergeOutputSide(seg);
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/2aedddf4/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index f031f76..748bac9 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -72,8 +72,8 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
protected static final Logger logger = LoggerFactory.getLogger(AbstractHadoopJob.class);
protected static final Option OPTION_JOB_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Job name. For exmaple, Kylin_Cuboid_Builder-clsfd_v2_Step_22-D)").create("jobname");
- protected static final Option OPTION_JOB_FLOW_ID = OptionBuilder.withArgName("job flow ID").hasArg().isRequired(true).withDescription("job flow ID").create("jobflowid");
protected static final Option OPTION_CUBE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube name. For exmaple, flat_item_cube").create("cubename");
+ protected static final Option OPTION_CUBING_JOB_ID = OptionBuilder.withArgName("cubingJobId").hasArg().isRequired(false).withDescription("ID of cubing job executable").create("cubingJobId");
protected static final Option OPTION_II_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("II name. For exmaple, some_ii").create("iiname");
protected static final Option OPTION_SEGMENT_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Cube segment name)").create("segmentname");
protected static final Option OPTION_TABLE_NAME = OptionBuilder.withArgName("name").hasArg().isRequired(true).withDescription("Hive table name.").create("tablename");
@@ -477,4 +477,9 @@ public abstract class AbstractHadoopJob extends Configured implements Tool {
return this.job;
}
+ // tells MapReduceExecutable to skip this job
+ public boolean isSkipped() {
+ return false;
+ }
+
}