You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:20:02 UTC
[45/50] [abbrv] kylin git commit: Merge commit
'5f2eff68d80ea6264d7590e14c052114c3cd6b74'
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9afec83,17f6b58..9789b70
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@@ -102,11 -98,8 +102,12 @@@ import org.apache.kylin.rest.request.Pr
import org.apache.kylin.rest.request.SQLRequest;
import org.apache.kylin.rest.response.SQLResponse;
import org.apache.kylin.rest.util.AclEvaluate;
+import org.apache.kylin.rest.util.AclPermissionUtil;
+ import org.apache.kylin.rest.util.QueryRequestLimits;
import org.apache.kylin.rest.util.TableauInterceptor;
+import org.apache.kylin.shaded.htrace.org.apache.htrace.Sampler;
+import org.apache.kylin.shaded.htrace.org.apache.htrace.Trace;
+import org.apache.kylin.shaded.htrace.org.apache.htrace.TraceScope;
import org.apache.kylin.storage.hybrid.HybridInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@@ -403,163 -403,121 +404,176 @@@ public class QueryService extends Basic
final QueryContext queryContext = QueryContext.current();
+ TraceScope scope = null;
- if (KylinConfig.getInstanceFromEnv().isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
++ if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
+ logger.info("Current query is under tracing");
+ HtraceInit.init();
+ scope = Trace.startSpan("query life cycle for " + queryContext.getQueryId(), Sampler.ALWAYS);
+ }
+ String traceUrl = getTraceUrl(scope);
+
try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
-
++ long startTime = System.currentTimeMillis();
++
++ SQLResponse sqlResponse = null;
String sql = sqlRequest.getSql();
String project = sqlRequest.getProject();
++ boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig);
logger.info("Using project: " + project);
logger.info("The original query: " + sql);
- final boolean isSelect = QueryUtil.isSelectStatement(sql);
- final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled()
- && kylinConfig.isPushDownUpdateEnabled();
- final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
+ sql = QueryUtil.removeCommentInSql(sql);
- if (!isSelect && !isPushDownUpdateEnabled) {
- logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
- throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
- }
-
- SQLResponse sqlResponse = null;
+ Pair<Boolean, String> result = TempStatementUtil.handleTempStatement(sql, kylinConfig);
-
- boolean isTempStatement = result.getFirst();
++ boolean isCreateTempStatement = result.getFirst();
+ sql = result.getSecond();
+ sqlRequest.setSql(sql);
- final boolean isSelect = QueryUtil.isSelectStatement(sql);
-
- long startTime = System.currentTimeMillis();
-
- SQLResponse sqlResponse = null;
- boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
- "query cache disabled in KylinConfig") && //
- checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
-
- if (queryCacheEnabled) {
- try {
- // Check project level query request concurrency limitation per query server
- if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
- logger.warn(
- "Directly return exception as too many concurrent query requests for project:" + project);
- throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
++ // try some cheap executions
++ if (sqlResponse == null && isQueryInspect) {
++ sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
++ }
++
++ if (sqlResponse == null && isCreateTempStatement) {
++ sqlResponse = new SQLResponse(null, null, 0, false, null);
++ }
++
++ if (sqlResponse == null && isQueryCacheEnabled) {
+ sqlResponse = searchQueryInCache(sqlRequest);
+ Trace.addTimelineAnnotation("query cache searched");
- } else {
- Trace.addTimelineAnnotation("query cache skip search");
+ }
-
- try {
- if (null == sqlResponse) {
- if (isQueryInspect) {
- // set query sql to exception message string
- sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
- } else if (isTempStatement) {
- sqlResponse = new SQLResponse(null, null, 0, false, null);
- } else if (isSelect) {
- sqlResponse = query(sqlRequest);
- Trace.addTimelineAnnotation("query almost done");
- } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) {
- sqlResponse = update(sqlRequest);
- Trace.addTimelineAnnotation("update query almost done");
- } else {
- logger.debug(
- "Directly return exception as the sql is unsupported, and query pushdown is disabled");
- throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
- }
-
- long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
- long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
- long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
- logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
- String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
- String.valueOf(sqlResponse.getTotalScanCount()));
- if (checkCondition(queryCacheEnabled, "query cache is disabled") //
- && checkCondition(!sqlResponse.getIsException(), "query has exception") //
- && checkCondition(!(sqlResponse.isPushDown()
- && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
- "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
- && checkCondition(
- sqlResponse.getDuration() > durationThreshold
- || sqlResponse.getTotalScanCount() > scanCountThreshold
- || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
- "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
- sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
- scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
- && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
- "query response is too large: {} ({})", sqlResponse.getResults().size(),
- kylinConfig.getLargeQueryThreshold())) {
- cacheManager.getCache(SUCCESS_QUERY_CACHE)
- .put(new Element(sqlRequest.getCacheKey(), sqlResponse));
- }
- Trace.addTimelineAnnotation("response from execution");
-
- } else {
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
- sqlResponse.setTotalScanCount(0);
- sqlResponse.setTotalScanBytes(0);
- Trace.addTimelineAnnotation("response from cache");
++
++ // real execution if required
++ if (sqlResponse == null) {
++ try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
++ sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
}
++ } else {
++ Trace.addTimelineAnnotation("response without real execution");
++ }
- long startTime = System.currentTimeMillis();
-
- // force clear the query context before a new query
- OLAPContext.clearThreadLocalContexts();
++ // check authorization before return, since the response may come from cache
++ if (!sqlResponse.getIsException())
+ checkQueryAuth(sqlResponse, project);
- } catch (Throwable e) { // calcite may throw AssertError
- logger.error("Exception while executing query", e);
- String errMsg = makeErrorMsgUserFriendly(e);
-
- sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
- sqlResponse.setTotalScanCount(queryContext.getScannedRows());
- sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
-
- if (queryCacheEnabled && e.getCause() != null
- && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
- Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
- exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
- boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
- "query cache disabled in KylinConfig") && //
- checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
- if (queryCacheEnabled) {
- sqlResponse = searchQueryInCache(sqlRequest);
-- }
- Trace.addTimelineAnnotation("error response");
- }
-
++ sqlResponse.setDuration(System.currentTimeMillis() - startTime);
+ sqlResponse.setTraceUrl(traceUrl);
+ logQuery(sqlRequest, sqlResponse);
+ try {
+ recordMetric(sqlRequest, sqlResponse);
+ } catch (Throwable th) {
+ logger.warn("Write metric error.", th);
+ }
+ if (sqlResponse.getIsException())
+ throw new InternalErrorException(sqlResponse.getExceptionMessage());
- try {
- if (null == sqlResponse) {
- if (isSelect) {
- sqlResponse = query(sqlRequest);
- } else if (isPushDownUpdateEnabled) {
- sqlResponse = update(sqlRequest);
- }
+ return sqlResponse;
- long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
- long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
- long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
- logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
- String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
- String.valueOf(sqlResponse.getTotalScanCount()));
- if (checkCondition(queryCacheEnabled, "query cache is disabled") //
- && checkCondition(!sqlResponse.getIsException(), "query has exception") //
- && checkCondition(!(sqlResponse.isPushDown()
- && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
- "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
- && checkCondition(
- sqlResponse.getDuration() > durationThreshold
- || sqlResponse.getTotalScanCount() > scanCountThreshold
- || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
- "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
- sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
- scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
- && checkCondition(
- sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
- "query response is too large: {} ({})", sqlResponse.getResults().size(),
- kylinConfig.getLargeQueryThreshold())) {
- cacheManager.getCache(SUCCESS_QUERY_CACHE)
- .put(new Element(sqlRequest.getCacheKey(), sqlResponse));
- }
+ } finally {
+ BackdoorToggles.cleanToggles();
+ QueryContext.reset();
+ if (scope != null) {
+ scope.close();
+ }
+ }
+ }
- } else {
- sqlResponse.setDuration(System.currentTimeMillis() - startTime);
- sqlResponse.setTotalScanCount(0);
- sqlResponse.setTotalScanBytes(0);
- }
++ private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
++ KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
++ Message msg = MsgPicker.getMsg();
++
++ SQLResponse sqlResponse = null;
++ try {
++ final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
++ if (isSelect) {
++ sqlResponse = query(sqlRequest);
++ Trace.addTimelineAnnotation("query almost done");
++ } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) {
++ sqlResponse = update(sqlRequest);
++ Trace.addTimelineAnnotation("update query almost done");
++ } else {
++ logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
++ throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
++ }
+
- checkQueryAuth(sqlResponse, project, secureEnabled);
++ long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
++ long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
++ long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
++ sqlResponse.setDuration(System.currentTimeMillis() - startTime);
++ logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
++ String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
++ String.valueOf(sqlResponse.getTotalScanCount()));
++ if (checkCondition(queryCacheEnabled, "query cache is disabled") //
++ && checkCondition(!sqlResponse.getIsException(), "query has exception") //
++ && checkCondition(
++ !(sqlResponse.isPushDown()
++ && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
++ "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
++ && checkCondition(
++ sqlResponse.getDuration() > durationThreshold
++ || sqlResponse.getTotalScanCount() > scanCountThreshold
++ || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
++ "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
++ sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
++ scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
++ && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
++ "query response is too large: {} ({})", sqlResponse.getResults().size(),
++ kylinConfig.getLargeQueryThreshold())) {
++ cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse));
++ }
++ Trace.addTimelineAnnotation("response from execution");
+
- } catch (Throwable e) { // calcite may throw AssertError
- logger.error("Exception while executing query", e);
- String errMsg = makeErrorMsgUserFriendly(e);
++ } catch (Throwable e) { // calcite may throw AssertError
++ logger.error("Exception while executing query", e);
++ String errMsg = makeErrorMsgUserFriendly(e);
+
- sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
- sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));
- sqlResponse.setTotalScanCount(queryContext.getScannedRows());
- sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
- sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
++ sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
++ QueryContext queryContext = QueryContext.current();
++ sqlResponse.setTotalScanCount(queryContext.getScannedRows());
++ sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+
- if (queryCacheEnabled && e.getCause() != null
- && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
- Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
- exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
- }
- }
- } finally {
- QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
++ if (queryCacheEnabled && e.getCause() != null
++ && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
++ Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
++ exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+ }
++ Trace.addTimelineAnnotation("error response");
++ }
++ return sqlResponse;
++ }
+
- logQuery(sqlRequest, sqlResponse);
++ private boolean isQueryCacheEnabled(KylinConfig kylinConfig) {
++ return checkCondition(kylinConfig.isQueryCacheEnabled(),
++ "query cache disabled in KylinConfig") && //
++ checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
++ }
+
- QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
- QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
+ protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException {
+ QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
+ QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
+ }
- if (sqlResponse.getIsException())
- throw new InternalErrorException(sqlResponse.getExceptionMessage());
+ private String getTraceUrl(TraceScope scope) {
+ if (scope == null) {
+ return null;
+ }
- return sqlResponse;
+ String hostname = System.getProperty("zipkin.collector-hostname");
+ if (StringUtils.isEmpty(hostname)) {
+ try {
+ hostname = InetAddress.getLocalHost().getHostName();
+ } catch (UnknownHostException e) {
+ logger.debug("failed to get trace url due to " + e.getMessage());
+ return null;
+ }
+ }
- } finally {
- BackdoorToggles.cleanToggles();
- QueryContext.reset();
+ String port = System.getProperty("zipkin.web-ui-port");
+ if (StringUtils.isEmpty(port)) {
+ port = "9411";
}
+
+ return "http://" + hostname + ":" + port + "/zipkin/traces/" + Long.toHexString(scope.getSpan().getTraceId());
}
private String getUserName() {
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d218bc2,901ac46..8a6dd96
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@@ -111,12 -115,27 +111,11 @@@ public class TableService extends Basic
public String[] loadHiveTablesToProject(String[] tables, String project) throws Exception {
aclEvaluate.checkProjectAdminPermission(project);
- // de-dup
- SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
- for (String fullTableName : tables) {
- String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
- db2tables.put(parts[0], parts[1]);
- }
-
- // load all tables first
- List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
- ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
- for (Map.Entry<String, String> entry : db2tables.entries()) {
- Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
- TableDesc tableDesc = pair.getFirst();
- Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase()));
- Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase()));
- Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase()));
- TableExtDesc extDesc = pair.getSecond();
- Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
- allMeta.add(pair);
- }
+ List<Pair<TableDesc, TableExtDesc>> allMeta = getAllMeta(tables, project);
+ return loadHiveTablesToProject(project, allMeta);
+ }
+ String[] loadHiveTablesToProject(String project, List<Pair<TableDesc, TableExtDesc>> allMeta) throws Exception {
-
// do schema check
TableMetadataManager metaMgr = getTableManager();
CubeManager cubeMgr = getCubeManager();
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
index 0000000,cddaa12..eb8eee8
mode 000000,100644..100644
--- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
@@@ -1,0 -1,90 +1,121 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.rest.util;
+
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+
++import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.metadata.project.ProjectInstance;
++import org.apache.kylin.metadata.project.ProjectManager;
++import org.apache.kylin.rest.exception.BadRequestException;
++import org.apache.kylin.rest.msg.Message;
++import org.apache.kylin.rest.msg.MsgPicker;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+
+ import com.google.common.cache.CacheBuilder;
+ import com.google.common.cache.CacheLoader;
+ import com.google.common.cache.LoadingCache;
+ import com.google.common.cache.RemovalListener;
+ import com.google.common.cache.RemovalNotification;
+
-public class QueryRequestLimits {
++public class QueryRequestLimits implements AutoCloseable {
+ private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class);
+
+ private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder()
+ .removalListener(new RemovalListener<String, AtomicInteger>() {
+ @Override
+ public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
+ logger.info("Current running query number " + notification.getValue().get() + " for project "
+ + notification.getKey() + " is removed due to " + notification.getCause());
+ }
+ }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
+ @Override
+ public AtomicInteger load(String s) throws Exception {
+ return new AtomicInteger(0);
+ }
+ });
+
- public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
++ static boolean openQueryRequest(String project, int maxConcurrentQuery) {
+ if (maxConcurrentQuery == 0) {
+ return true;
+ }
+ try {
+ AtomicInteger nRunningQueries = runningStats.get(project);
+ for (;;) {
+ int nRunning = nRunningQueries.get();
+ if (nRunning < maxConcurrentQuery) {
+ if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
+ return true;
+ }
+ } else {
+ return false;
+ }
+ }
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
- public static void closeQueryRequest(String project, int maxConcurrentQuery) {
++ static void closeQueryRequest(String project, int maxConcurrentQuery) {
+ if (maxConcurrentQuery == 0) {
+ return;
+ }
+ AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+ if (nRunningQueries != null) {
+ nRunningQueries.decrementAndGet();
+ }
+ }
+
+ public static Integer getCurrentRunningQuery(String project) {
+ AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+ if (nRunningQueries != null) {
+ return nRunningQueries.get();
+ } else {
+ return null;
+ }
+ }
++
++ // ============================================================================
++
++ final private String project;
++ final private int maxConcurrentQuery;
++
++ public QueryRequestLimits(String project) {
++ this.project = project;
++
++ ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
++ ProjectInstance prj = mgr.getProject(project);
++ this.maxConcurrentQuery = prj.getConfig().getQueryConcurrentRunningThresholdForProject();
++
++ boolean ok = openQueryRequest(project, maxConcurrentQuery);
++ if (!ok) {
++ Message msg = MsgPicker.getMsg();
++ logger.warn("Directly return exception as too many concurrent query requests for project:" + project);
++ throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
++ }
++ }
++
++ @Override
++ public void close() {
++ closeQueryRequest(project, maxConcurrentQuery);
++ }
+ }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
index ada78cd,7e513ae..be20419
--- a/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
@@@ -169,4 -108,31 +169,5 @@@ public class ValidateUtil
}
return cols;
}
+
- public Set<String> getAllUsers(String project) throws IOException {
- Set<String> allUsers = new HashSet<>();
- // add users that is global admin
- for (ManagedUser managedUser : userService.listUsers()) {
- if (managedUser.getAuthorities().contains(new SimpleGrantedAuthority(Constant.ROLE_ADMIN))) {
- allUsers.add(managedUser.getUsername());
- }
- }
-
- // add users that has project permission
- ProjectInstance prj = projectService.getProjectManager().getProject(project);
- AclEntity ae = accessService.getAclEntity("ProjectInstance", prj.getUuid());
- Acl acl = accessService.getAcl(ae);
- if (acl != null && acl.getEntries() != null) {
- for (AccessControlEntry ace : acl.getEntries()) {
- allUsers.add(((PrincipalSid) ace.getSid()).getPrincipal());
- }
- }
- return allUsers;
- }
-
- public void validateArgs(String... args) {
- for (String arg : args) {
- Preconditions.checkState(!StringUtils.isEmpty(arg));
- }
- }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
----------------------------------------------------------------------
diff --cc server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
index fc97eff,414a241..528b18e
--- a/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
@@@ -23,10 -23,9 +23,10 @@@ import java.sql.SQLException
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.metadata.MetadataConstants;
import org.apache.kylin.metadata.acl.TableACLManager;
import org.apache.kylin.query.security.AccessDeniedException;
- import org.apache.kylin.query.security.QuerACLTestUtil;
+ import org.apache.kylin.query.security.QueryACLTestUtil;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Before;
@@@ -59,26 -55,26 +59,26 @@@ public class QueryWithTableACLTest exte
@Test
public void testFailQuery() throws SQLException, IOException {
- QuerACLTestUtil.setUser(MODELER);
- QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
+ QueryACLTestUtil.setUser(MODELER);
+ QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
- QuerACLTestUtil.setUser(ADMIN);
+ QueryACLTestUtil.setUser(ADMIN);
- TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE);
+ TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER);
thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class));
thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied"));
- QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
+ QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
}
@Test
public void testFailQueryWithCountStar() throws SQLException, IOException {
- QuerACLTestUtil.setUser(MODELER);
- QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
+ QueryACLTestUtil.setUser(MODELER);
+ QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
- QuerACLTestUtil.setUser(ADMIN);
+ QueryACLTestUtil.setUser(ADMIN);
- TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE);
+ TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER);
thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class));
thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied"));
- QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
+ QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
}
@After
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
----------------------------------------------------------------------
diff --cc server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
index 75b8f21,0000000..3ee14e0
mode 100644,000000..100644
--- a/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
@@@ -1,72 -1,0 +1,62 @@@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.rest.service;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.junit.Assert;
+import org.junit.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+
+/**
+ *
+ */
+public class AdminServiceTest extends ServiceTestBase {
+
+ @Autowired
+ @Qualifier("adminService")
+ private AdminService adminService;
+
+ @Test
+ public void testGetPublicConfig() throws IOException {
+ //set ../examples/test_metadata/kylin.properties empty
+ File file = FileUtils.getFile(LOCALMETA_TEMP_DATA + "kylin.properties");
+ FileUtils.deleteQuietly(file);
+ FileUtils.touch(file);
+ String path = Thread.currentThread().getContextClassLoader().getResource("kylin.properties").getPath();
+ KylinConfig.setKylinConfigThreadLocal(KylinConfig.createInstanceFromUri(path));
- String expected = "kylin.web.link-streaming-guide=http://kylin.apache.org/\n" +
- "kylin.web.contact-mail=\n" +
- "kylin.query.cache-enabled=true\n" +
- "kylin.web.link-diagnostic=\n" +
- "kylin.web.help.length=4\n" +
- "kylin.web.timezone=GMT+8\n" +
- "kylin.server.external-acl-provider=\n" +
- "kylin.storage.default=2\n" +
- "kylin.web.help=\n" +
- "kylin.web.export-allow-other=true\n" +
- "kylin.web.link-hadoop=\n" +
- "kylin.web.hide-measures=\n" +
- "kylin.htrace.show-gui-trace-toggle=false\n" +
- "kylin.web.export-allow-admin=true\n" +
- "kylin.env=QA\n" +
- "kylin.web.hive-limit=20\n" +
- "kylin.engine.default=2\n" +
- "kylin.web.help.3=onboard|Cube Design Tutorial|\n" +
- "kylin.web.help.2=tableau|Tableau Guide|\n" +
- "kylin.web.help.1=odbc|ODBC Driver|\n" +
- "kylin.web.help.0=start|Getting Started|\n" +
- "kylin.security.profile=testing\n";
- Assert.assertEquals(expected, adminService.getPublicConfig());
++
++ String publicConfig = adminService.getPublicConfig();
++
++ Assert.assertFalse(publicConfig.contains("kylin.metadata.data-model-manager-impl"));
++ Assert.assertFalse(publicConfig.contains("kylin.dictionary.use-forest-trie"));
++ Assert.assertFalse(publicConfig.contains("kylin.cube.segment-advisor"));
++ Assert.assertFalse(publicConfig.contains("kylin.job.use-remote-cli"));
++ Assert.assertFalse(publicConfig.contains("kylin.job.scheduler.provider"));
++ Assert.assertFalse(publicConfig.contains("kylin.engine.mr.job-jar"));
++ Assert.assertFalse(publicConfig.contains("kylin.engine.spark.sanity-check-enabled"));
++ Assert.assertFalse(publicConfig.contains("kylin.storage.provider"));
++ Assert.assertFalse(publicConfig.contains("kylin.query.convert-create-table-to-with"));
++ Assert.assertFalse(publicConfig.contains("kylin.server.init-tasks"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --cc source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0b23121,c700d82..e4564d0
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@@ -223,8 -224,10 +224,10 @@@ public class HiveMRInput implements IMR
.append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n");
createIntermediateTableHql
+ .append("ALTER TABLE " + intermediate + " SET TBLPROPERTIES('auto.purge'='true');\n");
+ createIntermediateTableHql
- .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
- hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
+ .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + "\n");
+ hiveCmdBuilder.addStatementWithRedistributeBy(createIntermediateTableHql);
hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --cc source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 59780e6,d05f14e..7fc26de
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@@ -133,7 -134,7 +134,7 @@@ public class JdbcHiveMRInput extends Hi
if (partitionDesc.isPartitioned()) {
partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
- flatDesc.getSegment(), flatDesc.getSegRange());
- flatDesc.getSegRange());
++ flatDesc.getSegment(), flatDesc.getSegRange());
}
String splitTable;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --cc source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 37bf8ff,fe5812b..c045ff7
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@@ -63,20 -63,20 +63,20 @@@ public class MergeOffsetStep extends Ab
final CubeSegment first = mergingSegs.get(0);
final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
- segment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
- segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
- segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
+ segCopy.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
+ segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
+ segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
- segment.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
+ segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
- CubeUpdate cubeBuilder = new CubeUpdate(cube);
- cubeBuilder.setToUpdateSegs(segment);
+ CubeUpdate update = new CubeUpdate(cubeCopy);
+ update.setToUpdateSegs(segCopy);
try {
- cubeManager.updateCube(cubeBuilder);
- return new ExecuteResult();
+ cubeManager.updateCube(update);
- return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++ return ExecuteResult.createSucceed();
} catch (IOException e) {
logger.error("fail to update cube segment offset", e);
- return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+ return ExecuteResult.createError(e);
}
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 073f183,838112f..68aa172
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@@ -28,9 -30,14 +29,9 @@@ import org.apache.commons.cli.Options
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Bytes;
@@@ -71,6 -79,8 +73,7 @@@ public class CreateHTableJob extends Ab
options.addOption(OPTION_CUBE_NAME);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_PARTITION_FILE_PATH);
- options.addOption(OPTION_STATISTICS_ENABLED);
+ options.addOption(OPTION_CUBOID_MODE);
parseOptions(options, args);
partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@@ -81,12 -92,31 +84,29 @@@
cubeDesc = cube.getDescriptor();
kylinConfig = cube.getConfig();
segmentID = getOptionValue(OPTION_SEGMENT_ID);
+ cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
CubeSegment cubeSegment = cube.getSegmentById(segmentID);
- Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
-
byte[][] splitKeys;
- final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
- if (statsEnabled) {
- Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap();
- Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
- if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
- Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
- for (Long cuboid : buildingCuboids) {
- Double cuboidSize = cuboidSizeMap.get(cuboid);
- if (cuboidSize == null) {
- logger.warn(cuboid + "cuboid's size is null will replace by 0");
- cuboidSize = 0.0;
- }
- optimizedCuboidSizeMap.put(cuboid, cuboidSize);
++ Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
++
++ // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
++ Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
++ if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
++ Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
++ for (Long cuboid : buildingCuboids) {
++ Double cuboidSize = cuboidSizeMap.get(cuboid);
++ if (cuboidSize == null) {
++ logger.warn(cuboid + "cuboid's size is null will replace by 0");
++ cuboidSize = 0.0;
+ }
- cuboidSizeMap = optimizedCuboidSizeMap;
++ optimizedCuboidSizeMap.put(cuboid, cuboidSize);
+ }
- splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent());
- } else {
- splitKeys = getRegionSplits(conf, partitionFilePath);
++ cuboidSizeMap = optimizedCuboidSizeMap;
+ }
++
+ splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
+ partitionFilePath.getParent());
CubeHTableUtil.createHTable(cubeSegment, splitKeys);
return 0;
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 06c0923,8817cb2..ea372d9
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@@ -185,4 -199,30 +199,30 @@@ public class HBaseMROutput2Transition i
throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
}
}
+
+ public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
+ return new IMRBatchOptimizeOutputSide2() {
+ HBaseMRSteps steps = new HBaseMRSteps(seg);
+
+ @Override
+ public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
- jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
++ jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+ }
+
+ @Override
+ public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+ jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+ jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+ }
+
+ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addOptimizeGarbageCollectionSteps(jobFlow);
+ }
+
+ @Override
+ public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+ steps.addCheckpointGarbageCollectionSteps(jobFlow);
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 3dd61d8,67c94ad..99cc9a7
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@@ -74,14 -76,31 +76,18 @@@ public class HBaseMRSteps extends JobBu
return rowkeyDistributionStep;
}
-
public HadoopShellExecutable createCreateHTableStep(String jobId) {
- return createCreateHTableStep(jobId, false);
++ return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
+ }
-
- public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
- return createCreateHTableStep(jobId, true);
- }
-
- public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) {
- return createCreateHTableStep(jobId, true, cuboidMode);
- }
-
- private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
- return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT);
- }
-
- private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) {
++
++ public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
StringBuilder cmd = new StringBuilder();
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
- appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+ appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
createHtableStep.setJobParams(cmd.toString());
createHtableStep.setJobClass(CreateHTableJob.class);
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
----------------------------------------------------------------------
diff --cc tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 0000000,2be381c..666d23a
mode 000000,100644..100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@@ -1,0 -1,673 +1,656 @@@
+ /*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ package org.apache.kylin.tool.metrics.systemcube;
+
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeDescManager;
+ import org.apache.kylin.cube.model.AggregationGroup;
+ import org.apache.kylin.cube.model.CubeDesc;
+ import org.apache.kylin.cube.model.DimensionDesc;
+ import org.apache.kylin.cube.model.HBaseColumnDesc;
+ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+ import org.apache.kylin.cube.model.HBaseMappingDesc;
+ import org.apache.kylin.cube.model.RowKeyColDesc;
+ import org.apache.kylin.cube.model.RowKeyDesc;
+ import org.apache.kylin.cube.model.SelectRule;
+ import org.apache.kylin.dimension.DictionaryDimEnc;
+ import org.apache.kylin.job.constant.JobStatusEnum;
+ import org.apache.kylin.measure.percentile.PercentileMeasureType;
+ import org.apache.kylin.metadata.model.FunctionDesc;
+ import org.apache.kylin.metadata.model.IEngineAware;
+ import org.apache.kylin.metadata.model.MeasureDesc;
+ import org.apache.kylin.metadata.model.ParameterDesc;
+ import org.apache.kylin.metrics.lib.SinkTool;
+ import org.apache.kylin.metrics.lib.impl.RecordEvent;
+ import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+ import org.apache.kylin.metrics.property.JobPropertyEnum;
+ import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
+ import org.apache.kylin.metrics.property.QueryPropertyEnum;
+ import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+
+ public class CubeDescCreator {
+
- public static void main(String[] args) throws Exception {
- // KylinConfig.setSandboxEnvIfPossible();
- KylinConfig config = KylinConfig.getInstanceFromEnv();
-
- CubeDesc kylinCubeDesc = generateKylinCubeDescForMetricsQuery(config, new HiveSinkTool());
- ByteArrayOutputStream buf = new ByteArrayOutputStream();
- DataOutputStream dout = new DataOutputStream(buf);
- CubeDescManager.CUBE_DESC_SERIALIZER.serialize(kylinCubeDesc, dout);
- dout.close();
- buf.close();
- System.out.println(buf.toString());
- }
-
+ public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, SinkTool sinkTool) {
+ String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
+
+ //Set for dimensions
+ List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery();
+ dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+
+ List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+ for (String dimensionName : dimensions) {
+ dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+ }
+
+ //Set for measures
+ List<String> measures = ModelCreator.getMeasuresForMetricsQuery();
+ measures.remove(QueryPropertyEnum.ID_CODE.toString());
+ List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
+
+ List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuery();
+ Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+ for (Pair<String, String> entry : measureTypeList) {
+ measureTypeMap.put(entry.getKey(), entry.getValue());
+ }
+ measureDescList.add(getMeasureCount());
+ measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(),
+ measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString())));
+ for (String measure : measures) {
+ measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+ measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+ }
+ measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString()));
+ measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString()));
+
+ //Set for row key
+ RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+ int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.USER.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.PROJECT.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.EXCEPTION.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.TYPE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+ idx++;
+
+ RowKeyDesc rowKeyDesc = new RowKeyDesc();
+ rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+
+ //Set for aggregation group
+ String[][] hierarchy_dims = new String[2][];
+ hierarchy_dims[0] = getTimeHierarchy();
+ hierarchy_dims[1] = new String[2];
+ hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString();
+ hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString();
+ for (int i = 0; i < hierarchy_dims.length; i++) {
+ hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+ }
+
+ SelectRule selectRule = new SelectRule();
+ selectRule.mandatoryDims = new String[0];
+ selectRule.hierarchyDims = hierarchy_dims;
+ selectRule.jointDims = new String[0][0];
+
+ AggregationGroup aggGroup = new AggregationGroup();
+ aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+ aggGroup.setSelectRule(selectRule);
+
+ //Set for hbase mapping
+ HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+ hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+
+ return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+ rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+ }
+
+ public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, SinkTool sinkTool) {
+ String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
+
+ //Set for dimensions
+ List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube();
+ dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+ dimensions.remove(QueryCubePropertyEnum.PROJECT.toString());
+
+ List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+ for (String dimensionName : dimensions) {
+ dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+ }
+
+ //Set for measures
+ List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube();
+ List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2);
+
+ List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryCube();
+ Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+ for (Pair<String, String> entry : measureTypeList) {
+ measureTypeMap.put(entry.getKey(), entry.getValue());
+ }
+ measureDescList.add(getMeasureCount());
+ for (String measure : measures) {
+ measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+ if (!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) {
+ measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+ }
+ }
+
+ //Set for row key
+ RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+ int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.SEGMENT.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1);
+ idx++;
+
+ RowKeyDesc rowKeyDesc = new RowKeyDesc();
+ rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+
+ //Set for aggregation group
+ String[] mandatory_dims = new String[] { QueryCubePropertyEnum.CUBE.toString() };
+ mandatory_dims = refineColumnWithTable(tableName, mandatory_dims);
+
+ String[][] hierarchy_dims = new String[1][];
+ hierarchy_dims[0] = getTimeHierarchy();
+ for (int i = 0; i < hierarchy_dims.length; i++) {
+ hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+ }
+
+ String[][] joint_dims = new String[1][];
+ joint_dims[0] = new String[] { QueryCubePropertyEnum.CUBOID_SOURCE.toString(),
+ QueryCubePropertyEnum.CUBOID_TARGET.toString() };
+ for (int i = 0; i < joint_dims.length; i++) {
+ joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]);
+ }
+
+ SelectRule selectRule = new SelectRule();
+ selectRule.mandatoryDims = mandatory_dims;
+ selectRule.hierarchyDims = hierarchy_dims;
+ selectRule.jointDims = joint_dims;
+
+ AggregationGroup aggGroup = new AggregationGroup();
+ aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+ aggGroup.setSelectRule(selectRule);
+
+ //Set for hbase mapping
+ HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+ hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+
+ return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+ rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+ }
+
+ public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, SinkTool sinkTool) {
+ String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
+
+ //Set for dimensions
+ List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC();
+ dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+
+ List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+ for (String dimensionName : dimensions) {
+ dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+ }
+
+ //Set for measures
+ List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC();
+ List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
+
+ List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+ Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+ for (Pair<String, String> entry : measureTypeList) {
+ measureTypeMap.put(entry.getKey(), entry.getValue());
+ }
+ measureDescList.add(getMeasureCount());
+ for (String measure : measures) {
+ measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+ measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+ }
+ measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString()));
+
+ //Set for row key
+ RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+ int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.PROJECT.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1);
+ idx++;
+
+ RowKeyDesc rowKeyDesc = new RowKeyDesc();
+ rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+
+ //Set for aggregation group
+ String[][] hierarchy_dims = new String[1][];
+ hierarchy_dims[0] = getTimeHierarchy();
+ for (int i = 0; i < hierarchy_dims.length; i++) {
+ hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+ }
+
+ SelectRule selectRule = new SelectRule();
+ selectRule.mandatoryDims = new String[0];
+ selectRule.hierarchyDims = hierarchy_dims;
+ selectRule.jointDims = new String[0][0];
+
+ AggregationGroup aggGroup = new AggregationGroup();
+ aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+ aggGroup.setSelectRule(selectRule);
+
+ //Set for hbase mapping
+ HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+ hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+
+ return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+ rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+ }
+
+ public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig config, SinkTool sinkTool) {
+ String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob());
+
+ //Set for dimensions
+ List<String> dimensions = ModelCreator.getDimensionsForMetricsJob();
+ dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+
+ List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+ for (String dimensionName : dimensions) {
+ dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+ }
+
+ //Set for measures
+ List<String> measures = ModelCreator.getMeasuresForMetricsJob();
+ List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize((measures.size() - 4) * 3 + 1 + 1 + 4);
+
+ Set<String> stepDuration = Sets.newHashSet();
+ stepDuration.add(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString());
+ stepDuration.add(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString());
+ stepDuration.add(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString());
+ stepDuration.add(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString());
+
+ List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsJob();
+ Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+ for (Pair<String, String> entry : measureTypeList) {
+ measureTypeMap.put(entry.getKey(), entry.getValue());
+ }
+ measureDescList.add(getMeasureCount());
+ for (String measure : measures) {
+ measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+ measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+ if (!stepDuration.contains(measure)) {
+ measureDescList.add(getMeasureMin(measure, measureTypeMap.get(measure)));
+ }
+ }
+ measureDescList.add(getMeasurePercentile(JobPropertyEnum.BUILD_DURATION.toString()));
+
+ //Set for row key
+ RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+ int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1);
+ idx++;
+
+ RowKeyDesc rowKeyDesc = new RowKeyDesc();
+ rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+
+ //Set for aggregation group
+ String[][] hierarchy_dims = new String[1][];
+ hierarchy_dims[0] = getTimeHierarchy();
+ for (int i = 0; i < hierarchy_dims.length; i++) {
+ hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+ }
+
+ SelectRule selectRule = new SelectRule();
+ selectRule.mandatoryDims = new String[0];
+ selectRule.hierarchyDims = hierarchy_dims;
+ selectRule.jointDims = new String[0][0];
+
+ AggregationGroup aggGroup = new AggregationGroup();
+ aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+ aggGroup.setSelectRule(selectRule);
+
+ //Set for hbase mapping
+ HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+ hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+
+ return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+ rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+ }
+
+ public static CubeDesc generateKylinCubeDescForMetricsJobException(KylinConfig config, SinkTool sinkTool) {
+ String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException());
+
+ //Set for dimensions
+ List<String> dimensions = ModelCreator.getDimensionsForMetricsJobException();
+ dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+ dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+
+ List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+ for (String dimensionName : dimensions) {
+ dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+ }
+
+ //Set for measures
+ List<String> measures = ModelCreator.getMeasuresForMetricsJobException();
+ measures.remove(JobPropertyEnum.ID_CODE.toString());
+ List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(1);
+
+ measureDescList.add(getMeasureCount());
+
+ //Set for row key
+ RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+ int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.EXCEPTION.toString(), idx + 1);
+ idx++;
+
+ RowKeyDesc rowKeyDesc = new RowKeyDesc();
+ rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+
+ //Set for aggregation group
+ String[][] hierarchy_dims = new String[1][];
+ hierarchy_dims[0] = getTimeHierarchy();
+ for (int i = 0; i < hierarchy_dims.length; i++) {
+ hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+ }
+
+ SelectRule selectRule = new SelectRule();
+ selectRule.mandatoryDims = new String[0];
+ selectRule.hierarchyDims = hierarchy_dims;
+ selectRule.jointDims = new String[0][0];
+
+ AggregationGroup aggGroup = new AggregationGroup();
+ aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+ aggGroup.setSelectRule(selectRule);
+
+ //Set for hbase mapping
+ HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+ hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+
+ return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+ rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+ }
+
+ public static CubeDesc generateKylinCubeDesc(String tableName, int storageType,
+ List<DimensionDesc> dimensionDescList, List<MeasureDesc> measureDescList, RowKeyDesc rowKeyDesc,
+ AggregationGroup aggGroup, HBaseMappingDesc hBaseMapping, Map<String, String> overrideProperties) {
+ CubeDesc desc = new CubeDesc();
+ desc.setName(tableName.replace('.', '_'));
+ desc.setModelName(tableName.replace('.', '_'));
+ desc.setDescription("");
+ desc.setLastModified(0L);
+ desc.setDimensions(dimensionDescList);
+ desc.setMeasures(measureDescList);
+ desc.setRowkey(rowKeyDesc);
+ desc.setHbaseMapping(hBaseMapping);
+ desc.setNotifyList(Lists.<String> newArrayList());
+ desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
+ desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
+ desc.setEngineType(IEngineAware.ID_MR_V2);
+ desc.setStorageType(storageType);
+ desc.setAggregationGroups(Lists.newArrayList(aggGroup));
+ desc.getOverrideKylinProps().putAll(overrideProperties);
+ desc.setSignature(desc.calculateSignature());
+ desc.updateRandomUuid();
+ return desc;
+ }
+
+ public static HBaseColumnFamilyDesc[] getHBaseColumnFamily(List<MeasureDesc> measureDescList) {
+ List<String> normalMeasureList = Lists.newLinkedList();
+ List<String> largeMeasureList = Lists.newLinkedList();
+ for (MeasureDesc measureDesc : measureDescList) {
+ if (measureDesc.getFunction().isCountDistinct()
+ || measureDesc.getFunction().getExpression().equals(PercentileMeasureType.FUNC_PERCENTILE)) {
+ largeMeasureList.add(measureDesc.getName());
+ } else {
+ normalMeasureList.add(measureDesc.getName());
+ }
+ }
+ List<HBaseColumnFamilyDesc> columnFamilyDescList = Lists.newLinkedList();
+ int idx = 1;
+ if (normalMeasureList.size() > 0) {
+ HBaseColumnDesc columnDesc = new HBaseColumnDesc();
+ columnDesc.setQualifier("M");
+ columnDesc.setMeasureRefs(normalMeasureList.toArray(new String[normalMeasureList.size()]));
+ HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc();
+ columnFamilyDesc.setName("F" + idx++);
+ columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc });
+
+ columnFamilyDescList.add(columnFamilyDesc);
+ }
+ for (String largeMeasure : largeMeasureList) {
+ HBaseColumnDesc columnDesc = new HBaseColumnDesc();
+ columnDesc.setQualifier("M");
+ columnDesc.setMeasureRefs(new String[] { largeMeasure });
+ HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc();
+ columnFamilyDesc.setName("F" + idx++);
+ columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc });
+
+ columnFamilyDescList.add(columnFamilyDesc);
+ }
+
+ return columnFamilyDescList.toArray(new HBaseColumnFamilyDesc[columnFamilyDescList.size()]);
+ }
+
+ public static String[] getTimeHierarchy() {
+ String[] result = new String[4];
+ result[0] = TimePropertyEnum.YEAR.toString();
+ result[1] = TimePropertyEnum.MONTH.toString();
+ result[2] = TimePropertyEnum.WEEK_BEGIN_DATE.toString();
+ result[3] = TimePropertyEnum.DAY_DATE.toString();
+ return result;
+ }
+
+ public static String[] refineColumnWithTable(String tableName, List<String> columns) {
+ String[] dimensions = new String[columns.size()];
+ for (int i = 0; i < dimensions.length; i++) {
+ dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns.get(i);
+ }
+ return dimensions;
+ }
+
+ public static String[] refineColumnWithTable(String tableName, String[] columns) {
+ String[] dimensions = new String[columns.length];
+ for (int i = 0; i < dimensions.length; i++) {
+ dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns[i];
+ }
+ return dimensions;
+ }
+
+ public static int getTimeRowKeyColDesc(String tableName, RowKeyColDesc[] rowKeyColDescs) {
+ int idx = 0;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.DAY_DATE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.WEEK_BEGIN_DATE.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.MONTH.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.YEAR.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_HOUR.toString(), idx + 1);
+ idx++;
+ rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_MINUTE.toString(), idx + 1);
+ idx++;
+ return idx;
+ }
+
+ public static RowKeyColDesc getRowKeyColDesc(String tableName, String column, int id) {
+ RowKeyColDesc rowKeyColDesc = new RowKeyColDesc();
+ rowKeyColDesc.setIndex(Integer.toString(id));
+ rowKeyColDesc.setColumn(tableName.substring(tableName.lastIndexOf(".") + 1) + "." + column);
+ rowKeyColDesc.setEncoding(DictionaryDimEnc.ENCODING_NAME);
+ rowKeyColDesc.setShardBy(false);
+ return rowKeyColDesc;
+ }
+
+ public static DimensionDesc getDimensionDesc(String tableName, String dimension) {
+ DimensionDesc dimensionDesc = new DimensionDesc();
+ dimensionDesc.setName(dimension);
+ dimensionDesc.setTable(tableName.substring(tableName.lastIndexOf(".") + 1));
+ dimensionDesc.setColumn(dimension);
+ return dimensionDesc;
+ }
+
+ public static MeasureDesc getMeasureCount() {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue("1");
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_CONSTANT);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(FunctionDesc.FUNC_COUNT);
+ function.setParameter(parameterDesc);
+ function.setReturnType(HiveTableCreator.HiveTypeEnum.HBIGINT.toString());
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName("_COUNT_");
+ result.setFunction(function);
+ return result;
+ }
+
+ public static MeasureDesc getMeasureSum(String column, String dataType) {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue(column);
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(FunctionDesc.FUNC_SUM);
+ function.setParameter(parameterDesc);
+ function.setReturnType(dataType.equals(HiveTableCreator.HiveTypeEnum.HDOUBLE.toString())
+ ? HiveTableCreator.HiveTypeEnum.HDECIMAL.toString()
+ : dataType);
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName(column + "_SUM");
+ result.setFunction(function);
+ return result;
+ }
+
+ public static MeasureDesc getMeasureMax(String column, String dataType) {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue(column);
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(FunctionDesc.FUNC_MAX);
+ function.setParameter(parameterDesc);
+ function.setReturnType(dataType);
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName(column + "_MAX");
+ result.setFunction(function);
+ return result;
+ }
+
+ public static MeasureDesc getMeasureMin(String column, String dataType) {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue(column);
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(FunctionDesc.FUNC_MIN);
+ function.setParameter(parameterDesc);
+ function.setReturnType(dataType);
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName(column + "_MIN");
+ result.setFunction(function);
+ return result;
+ }
+
+ public static MeasureDesc getMeasureHLL(String column) {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue(column);
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(FunctionDesc.FUNC_COUNT_DISTINCT);
+ function.setParameter(parameterDesc);
+ function.setReturnType("hllc12");
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName(column + "_HLL");
+ result.setFunction(function);
+ return result;
+ }
+
+ public static MeasureDesc getMeasurePercentile(String column) {
+ ParameterDesc parameterDesc = new ParameterDesc();
+ parameterDesc.setValue(column);
+ parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+
+ FunctionDesc function = new FunctionDesc();
+ function.setExpression(PercentileMeasureType.FUNC_PERCENTILE);
+ function.setParameter(parameterDesc);
+ function.setReturnType("percentile(100)");
+
+ MeasureDesc result = new MeasureDesc();
+ result.setName(column + "_PERCENTILE");
+ result.setFunction(function);
+ return result;
+ }
+ }