You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2017/12/31 13:20:02 UTC

[45/50] [abbrv] kylin git commit: Merge commit '5f2eff68d80ea6264d7590e14c052114c3cd6b74'

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
index 9afec83,17f6b58..9789b70
--- a/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/QueryService.java
@@@ -102,11 -98,8 +102,12 @@@ import org.apache.kylin.rest.request.Pr
  import org.apache.kylin.rest.request.SQLRequest;
  import org.apache.kylin.rest.response.SQLResponse;
  import org.apache.kylin.rest.util.AclEvaluate;
 +import org.apache.kylin.rest.util.AclPermissionUtil;
+ import org.apache.kylin.rest.util.QueryRequestLimits;
  import org.apache.kylin.rest.util.TableauInterceptor;
 +import org.apache.kylin.shaded.htrace.org.apache.htrace.Sampler;
 +import org.apache.kylin.shaded.htrace.org.apache.htrace.Trace;
 +import org.apache.kylin.shaded.htrace.org.apache.htrace.TraceScope;
  import org.apache.kylin.storage.hybrid.HybridInstance;
  import org.slf4j.Logger;
  import org.slf4j.LoggerFactory;
@@@ -403,163 -403,121 +404,176 @@@ public class QueryService extends Basic
  
          final QueryContext queryContext = QueryContext.current();
  
 +        TraceScope scope = null;
-         if (KylinConfig.getInstanceFromEnv().isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
++        if (kylinConfig.isHtraceTracingEveryQuery() || BackdoorToggles.getHtraceEnabled()) {
 +            logger.info("Current query is under tracing");
 +            HtraceInit.init();
 +            scope = Trace.startSpan("query life cycle for " + queryContext.getQueryId(), Sampler.ALWAYS);
 +        }
 +        String traceUrl = getTraceUrl(scope);
 +
          try (SetThreadName ignored = new SetThreadName("Query %s", queryContext.getQueryId())) {
- 
++            long startTime = System.currentTimeMillis();
++            
++            SQLResponse sqlResponse = null;
              String sql = sqlRequest.getSql();
              String project = sqlRequest.getProject();
++            boolean isQueryCacheEnabled = isQueryCacheEnabled(kylinConfig);
              logger.info("Using project: " + project);
              logger.info("The original query:  " + sql);
  
 -            final boolean isSelect = QueryUtil.isSelectStatement(sql);
 -            final boolean isPushDownUpdateEnabled = kylinConfig.isPushDownEnabled()
 -                    && kylinConfig.isPushDownUpdateEnabled();
 -            final int maxConcurrentQuery = projectInstance.getConfig().getQueryConcurrentRunningThresholdForProject();
 +            sql = QueryUtil.removeCommentInSql(sql);
  
 -            if (!isSelect && !isPushDownUpdateEnabled) {
 -                logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
 -                throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
 -            }
 -
 -            SQLResponse sqlResponse = null;
 +            Pair<Boolean, String> result = TempStatementUtil.handleTempStatement(sql, kylinConfig);
- 
-             boolean isTempStatement = result.getFirst();
++            boolean isCreateTempStatement = result.getFirst();
 +            sql = result.getSecond();
 +            sqlRequest.setSql(sql);
  
-             final boolean isSelect = QueryUtil.isSelectStatement(sql);
- 
-             long startTime = System.currentTimeMillis();
- 
-             SQLResponse sqlResponse = null;
-             boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
-                     "query cache disabled in KylinConfig") && //
-                     checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
- 
-             if (queryCacheEnabled) {
 -            try {
 -                // Check project level query request concurrency limitation per query server
 -                if (!QueryRequestLimits.openQueryRequest(projectInstance.getName(), maxConcurrentQuery)) {
 -                    logger.warn(
 -                            "Directly return exception as too many concurrent query requests for project:" + project);
 -                    throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
++            // try some cheap executions
++            if (sqlResponse == null && isQueryInspect) {
++                sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
++            }
++            
++            if (sqlResponse == null && isCreateTempStatement) {
++                sqlResponse = new SQLResponse(null, null, 0, false, null);
++            }
++            
++            if (sqlResponse == null && isQueryCacheEnabled) {
 +                sqlResponse = searchQueryInCache(sqlRequest);
 +                Trace.addTimelineAnnotation("query cache searched");
-             } else {
-                 Trace.addTimelineAnnotation("query cache skip search");
 +            }
- 
-             try {
-                 if (null == sqlResponse) {
-                     if (isQueryInspect) {
-                         // set query sql to exception message string
-                         sqlResponse = new SQLResponse(null, null, 0, false, sqlRequest.getSql());
-                     } else if (isTempStatement) {
-                         sqlResponse = new SQLResponse(null, null, 0, false, null);
-                     } else if (isSelect) {
-                         sqlResponse = query(sqlRequest);
-                         Trace.addTimelineAnnotation("query almost done");
-                     } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) {
-                         sqlResponse = update(sqlRequest);
-                         Trace.addTimelineAnnotation("update query almost done");
-                     } else {
-                         logger.debug(
-                                 "Directly return exception as the sql is unsupported, and query pushdown is disabled");
-                         throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
-                     }
- 
-                     long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
-                     long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
-                     long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
-                     sqlResponse.setDuration(System.currentTimeMillis() - startTime);
-                     logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
-                             String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
-                             String.valueOf(sqlResponse.getTotalScanCount()));
-                     if (checkCondition(queryCacheEnabled, "query cache is disabled") //
-                             && checkCondition(!sqlResponse.getIsException(), "query has exception") //
-                             && checkCondition(!(sqlResponse.isPushDown()
-                                     && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
-                                     "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
-                             && checkCondition(
-                                     sqlResponse.getDuration() > durationThreshold
-                                             || sqlResponse.getTotalScanCount() > scanCountThreshold
-                                             || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
-                                     "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
-                                     sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
-                                     scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
-                             && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
-                                     "query response is too large: {} ({})", sqlResponse.getResults().size(),
-                                     kylinConfig.getLargeQueryThreshold())) {
-                         cacheManager.getCache(SUCCESS_QUERY_CACHE)
-                                 .put(new Element(sqlRequest.getCacheKey(), sqlResponse));
-                     }
-                     Trace.addTimelineAnnotation("response from execution");
- 
-                 } else {
-                     sqlResponse.setDuration(System.currentTimeMillis() - startTime);
-                     sqlResponse.setTotalScanCount(0);
-                     sqlResponse.setTotalScanBytes(0);
-                     Trace.addTimelineAnnotation("response from cache");
++            
++            // real execution if required
++            if (sqlResponse == null) {
++                try (QueryRequestLimits limit = new QueryRequestLimits(sqlRequest.getProject())) {
++                    sqlResponse = queryAndUpdateCache(sqlRequest, startTime, isQueryCacheEnabled);
                  }
++            } else {
++                Trace.addTimelineAnnotation("response without real execution");
++            }
  
 -                long startTime = System.currentTimeMillis();
 -
 -                // force clear the query context before a new query
 -                OLAPContext.clearThreadLocalContexts();
++            // check authorization before return, since the response may come from cache
++            if (!sqlResponse.getIsException())
 +                checkQueryAuth(sqlResponse, project);
  
-             } catch (Throwable e) { // calcite may throw AssertError
-                 logger.error("Exception while executing query", e);
-                 String errMsg = makeErrorMsgUserFriendly(e);
- 
-                 sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
-                 sqlResponse.setTotalScanCount(queryContext.getScannedRows());
-                 sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
- 
-                 if (queryCacheEnabled && e.getCause() != null
-                         && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
-                     Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
-                     exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
 -                boolean queryCacheEnabled = checkCondition(kylinConfig.isQueryCacheEnabled(),
 -                        "query cache disabled in KylinConfig") && //
 -                        checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
 -                if (queryCacheEnabled) {
 -                    sqlResponse = searchQueryInCache(sqlRequest);
--                }
-                 Trace.addTimelineAnnotation("error response");
-             }
- 
++            sqlResponse.setDuration(System.currentTimeMillis() - startTime);
 +            sqlResponse.setTraceUrl(traceUrl);
 +            logQuery(sqlRequest, sqlResponse);
 +            try {
 +                recordMetric(sqlRequest, sqlResponse);
 +            } catch (Throwable th) {
 +                logger.warn("Write metric error.", th);
 +            }
 +            if (sqlResponse.getIsException())
 +                throw new InternalErrorException(sqlResponse.getExceptionMessage());
  
 -                try {
 -                    if (null == sqlResponse) {
 -                        if (isSelect) {
 -                            sqlResponse = query(sqlRequest);
 -                        } else if (isPushDownUpdateEnabled) {
 -                            sqlResponse = update(sqlRequest);
 -                        }
 +            return sqlResponse;
  
 -                        long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
 -                        long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
 -                        long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
 -                        sqlResponse.setDuration(System.currentTimeMillis() - startTime);
 -                        logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
 -                                String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
 -                                String.valueOf(sqlResponse.getTotalScanCount()));
 -                        if (checkCondition(queryCacheEnabled, "query cache is disabled") //
 -                                && checkCondition(!sqlResponse.getIsException(), "query has exception") //
 -                                && checkCondition(!(sqlResponse.isPushDown()
 -                                        && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
 -                                        "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
 -                                && checkCondition(
 -                                        sqlResponse.getDuration() > durationThreshold
 -                                                || sqlResponse.getTotalScanCount() > scanCountThreshold
 -                                                || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
 -                                        "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
 -                                        sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
 -                                        scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
 -                                && checkCondition(
 -                                        sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
 -                                        "query response is too large: {} ({})", sqlResponse.getResults().size(),
 -                                        kylinConfig.getLargeQueryThreshold())) {
 -                            cacheManager.getCache(SUCCESS_QUERY_CACHE)
 -                                    .put(new Element(sqlRequest.getCacheKey(), sqlResponse));
 -                        }
 +        } finally {
 +            BackdoorToggles.cleanToggles();
 +            QueryContext.reset();
 +            if (scope != null) {
 +                scope.close();
 +            }
 +        }
 +    }
  
 -                    } else {
 -                        sqlResponse.setDuration(System.currentTimeMillis() - startTime);
 -                        sqlResponse.setTotalScanCount(0);
 -                        sqlResponse.setTotalScanBytes(0);
 -                    }
++    private SQLResponse queryAndUpdateCache(SQLRequest sqlRequest, long startTime, boolean queryCacheEnabled) {
++        KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
++        Message msg = MsgPicker.getMsg();
++        
++        SQLResponse sqlResponse = null;
++        try {
++            final boolean isSelect = QueryUtil.isSelectStatement(sqlRequest.getSql());
++            if (isSelect) {
++                sqlResponse = query(sqlRequest);
++                Trace.addTimelineAnnotation("query almost done");
++            } else if (kylinConfig.isPushDownEnabled() && kylinConfig.isPushDownUpdateEnabled()) {
++                sqlResponse = update(sqlRequest);
++                Trace.addTimelineAnnotation("update query almost done");
++            } else {
++                logger.debug("Directly return exception as the sql is unsupported, and query pushdown is disabled");
++                throw new BadRequestException(msg.getNOT_SUPPORTED_SQL());
++            }
+ 
 -                    checkQueryAuth(sqlResponse, project, secureEnabled);
++            long durationThreshold = kylinConfig.getQueryDurationCacheThreshold();
++            long scanCountThreshold = kylinConfig.getQueryScanCountCacheThreshold();
++            long scanBytesThreshold = kylinConfig.getQueryScanBytesCacheThreshold();
++            sqlResponse.setDuration(System.currentTimeMillis() - startTime);
++            logger.info("Stats of SQL response: isException: {}, duration: {}, total scan count {}", //
++                    String.valueOf(sqlResponse.getIsException()), String.valueOf(sqlResponse.getDuration()),
++                    String.valueOf(sqlResponse.getTotalScanCount()));
++            if (checkCondition(queryCacheEnabled, "query cache is disabled") //
++                    && checkCondition(!sqlResponse.getIsException(), "query has exception") //
++                    && checkCondition(
++                            !(sqlResponse.isPushDown()
++                                    && (isSelect == false || kylinConfig.isPushdownQueryCacheEnabled() == false)),
++                            "query is executed with pushdown, but it is non-select, or the cache for pushdown is disabled") //
++                    && checkCondition(
++                            sqlResponse.getDuration() > durationThreshold
++                                    || sqlResponse.getTotalScanCount() > scanCountThreshold
++                                    || sqlResponse.getTotalScanBytes() > scanBytesThreshold, //
++                            "query is too lightweight with duration: {} (threshold {}), scan count: {} (threshold {}), scan bytes: {} (threshold {})",
++                            sqlResponse.getDuration(), durationThreshold, sqlResponse.getTotalScanCount(),
++                            scanCountThreshold, sqlResponse.getTotalScanBytes(), scanBytesThreshold)
++                    && checkCondition(sqlResponse.getResults().size() < kylinConfig.getLargeQueryThreshold(),
++                            "query response is too large: {} ({})", sqlResponse.getResults().size(),
++                            kylinConfig.getLargeQueryThreshold())) {
++                cacheManager.getCache(SUCCESS_QUERY_CACHE).put(new Element(sqlRequest.getCacheKey(), sqlResponse));
++            }
++            Trace.addTimelineAnnotation("response from execution");
+ 
 -                } catch (Throwable e) { // calcite may throw AssertError
 -                    logger.error("Exception while executing query", e);
 -                    String errMsg = makeErrorMsgUserFriendly(e);
++        } catch (Throwable e) { // calcite may throw AssertError
++            logger.error("Exception while executing query", e);
++            String errMsg = makeErrorMsgUserFriendly(e);
+ 
 -                    sqlResponse = new SQLResponse(null, null, 0, true, errMsg);
 -                    sqlResponse.setThrowable(e.getCause() == null ? e : ExceptionUtils.getRootCause(e));
 -                    sqlResponse.setTotalScanCount(queryContext.getScannedRows());
 -                    sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
 -                    sqlResponse.setCubeSegmentStatisticsList(queryContext.getCubeSegmentStatisticsResultList());
++            sqlResponse = new SQLResponse(null, null, null, 0, true, errMsg, false, false);
++            QueryContext queryContext = QueryContext.current();
++            sqlResponse.setTotalScanCount(queryContext.getScannedRows());
++            sqlResponse.setTotalScanBytes(queryContext.getScannedBytes());
+ 
 -                    if (queryCacheEnabled && e.getCause() != null
 -                            && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
 -                        Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
 -                        exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
 -                    }
 -                }
 -            } finally {
 -                QueryRequestLimits.closeQueryRequest(projectInstance.getName(), maxConcurrentQuery);
++            if (queryCacheEnabled && e.getCause() != null
++                    && ExceptionUtils.getRootCause(e) instanceof ResourceLimitExceededException) {
++                Cache exceptionCache = cacheManager.getCache(EXCEPTION_QUERY_CACHE);
++                exceptionCache.put(new Element(sqlRequest.getCacheKey(), sqlResponse));
+             }
++            Trace.addTimelineAnnotation("error response");
++        }
++        return sqlResponse;
++    }
+ 
 -            logQuery(sqlRequest, sqlResponse);
++    private boolean isQueryCacheEnabled(KylinConfig kylinConfig) {
++        return checkCondition(kylinConfig.isQueryCacheEnabled(),
++                "query cache disabled in KylinConfig") && //
++                checkCondition(!BackdoorToggles.getDisableCache(), "query cache disabled in BackdoorToggles");
++    }
+ 
 -            QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
 -            QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
 +    protected void recordMetric(SQLRequest sqlRequest, SQLResponse sqlResponse) throws UnknownHostException {
 +        QueryMetricsFacade.updateMetrics(sqlRequest, sqlResponse);
 +        QueryMetrics2Facade.updateMetrics(sqlRequest, sqlResponse);
 +    }
  
 -            if (sqlResponse.getIsException())
 -                throw new InternalErrorException(sqlResponse.getExceptionMessage());
 +    private String getTraceUrl(TraceScope scope) {
 +        if (scope == null) {
 +            return null;
 +        }
  
 -            return sqlResponse;
 +        String hostname = System.getProperty("zipkin.collector-hostname");
 +        if (StringUtils.isEmpty(hostname)) {
 +            try {
 +                hostname = InetAddress.getLocalHost().getHostName();
 +            } catch (UnknownHostException e) {
 +                logger.debug("failed to get trace url due to " + e.getMessage());
 +                return null;
 +            }
 +        }
  
 -        } finally {
 -            BackdoorToggles.cleanToggles();
 -            QueryContext.reset();
 +        String port = System.getProperty("zipkin.web-ui-port");
 +        if (StringUtils.isEmpty(port)) {
 +            port = "9411";
          }
 +
 +        return "http://" + hostname + ":" + port + "/zipkin/traces/" + Long.toHexString(scope.getSpan().getTraceId());
      }
  
      private String getUserName() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
index d218bc2,901ac46..8a6dd96
--- a/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/TableService.java
@@@ -111,12 -115,27 +111,11 @@@ public class TableService extends Basic
  
      public String[] loadHiveTablesToProject(String[] tables, String project) throws Exception {
          aclEvaluate.checkProjectAdminPermission(project);
 -        // de-dup
 -        SetMultimap<String, String> db2tables = LinkedHashMultimap.create();
 -        for (String fullTableName : tables) {
 -            String[] parts = HadoopUtil.parseHiveTableName(fullTableName);
 -            db2tables.put(parts[0], parts[1]);
 -        }
 -
 -        // load all tables first
 -        List<Pair<TableDesc, TableExtDesc>> allMeta = Lists.newArrayList();
 -        ISourceMetadataExplorer explr = SourceFactory.getDefaultSource().getSourceMetadataExplorer();
 -        for (Map.Entry<String, String> entry : db2tables.entries()) {
 -            Pair<TableDesc, TableExtDesc> pair = explr.loadTableMetadata(entry.getKey(), entry.getValue(), project);
 -            TableDesc tableDesc = pair.getFirst();
 -            Preconditions.checkState(tableDesc.getDatabase().equals(entry.getKey().toUpperCase()));
 -            Preconditions.checkState(tableDesc.getName().equals(entry.getValue().toUpperCase()));
 -            Preconditions.checkState(tableDesc.getIdentity().equals(entry.getKey().toUpperCase() + "." + entry.getValue().toUpperCase()));
 -            TableExtDesc extDesc = pair.getSecond();
 -            Preconditions.checkState(tableDesc.getIdentity().equals(extDesc.getIdentity()));
 -            allMeta.add(pair);
 -        }
 +        List<Pair<TableDesc, TableExtDesc>> allMeta = getAllMeta(tables, project);
 +        return loadHiveTablesToProject(project, allMeta);
 +    }
  
 +    String[] loadHiveTablesToProject(String project, List<Pair<TableDesc, TableExtDesc>> allMeta) throws Exception {
- 
          // do schema check
          TableMetadataManager metaMgr = getTableManager();
          CubeManager cubeMgr = getCubeManager();

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
index 0000000,cddaa12..eb8eee8
mode 000000,100644..100644
--- a/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/QueryRequestLimits.java
@@@ -1,0 -1,90 +1,121 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.rest.util;
+ 
+ import java.util.concurrent.ExecutionException;
+ import java.util.concurrent.TimeUnit;
+ import java.util.concurrent.atomic.AtomicInteger;
+ 
++import org.apache.kylin.common.KylinConfig;
++import org.apache.kylin.metadata.project.ProjectInstance;
++import org.apache.kylin.metadata.project.ProjectManager;
++import org.apache.kylin.rest.exception.BadRequestException;
++import org.apache.kylin.rest.msg.Message;
++import org.apache.kylin.rest.msg.MsgPicker;
+ import org.slf4j.Logger;
+ import org.slf4j.LoggerFactory;
+ 
+ import com.google.common.cache.CacheBuilder;
+ import com.google.common.cache.CacheLoader;
+ import com.google.common.cache.LoadingCache;
+ import com.google.common.cache.RemovalListener;
+ import com.google.common.cache.RemovalNotification;
+ 
 -public class QueryRequestLimits {
++public class QueryRequestLimits implements AutoCloseable {
+     private static final Logger logger = LoggerFactory.getLogger(QueryRequestLimits.class);
+ 
+     private static LoadingCache<String, AtomicInteger> runningStats = CacheBuilder.newBuilder()
+             .removalListener(new RemovalListener<String, AtomicInteger>() {
+                 @Override
+                 public void onRemoval(RemovalNotification<String, AtomicInteger> notification) {
+                     logger.info("Current running query number " + notification.getValue().get() + " for project "
+                             + notification.getKey() + " is removed due to " + notification.getCause());
+                 }
+             }).expireAfterWrite(1, TimeUnit.DAYS).build(new CacheLoader<String, AtomicInteger>() {
+                 @Override
+                 public AtomicInteger load(String s) throws Exception {
+                     return new AtomicInteger(0);
+                 }
+             });
+ 
 -    public static boolean openQueryRequest(String project, int maxConcurrentQuery) {
++    static boolean openQueryRequest(String project, int maxConcurrentQuery) {
+         if (maxConcurrentQuery == 0) {
+             return true;
+         }
+         try {
+             AtomicInteger nRunningQueries = runningStats.get(project);
+             for (;;) {
+                 int nRunning = nRunningQueries.get();
+                 if (nRunning < maxConcurrentQuery) {
+                     if (nRunningQueries.compareAndSet(nRunning, nRunning + 1)) {
+                         return true;
+                     }
+                 } else {
+                     return false;
+                 }
+             }
+         } catch (ExecutionException e) {
+             throw new RuntimeException(e);
+         }
+     }
+ 
 -    public static void closeQueryRequest(String project, int maxConcurrentQuery) {
++    static void closeQueryRequest(String project, int maxConcurrentQuery) {
+         if (maxConcurrentQuery == 0) {
+             return;
+         }
+         AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+         if (nRunningQueries != null) {
+             nRunningQueries.decrementAndGet();
+         }
+     }
+ 
+     public static Integer getCurrentRunningQuery(String project) {
+         AtomicInteger nRunningQueries = runningStats.getIfPresent(project);
+         if (nRunningQueries != null) {
+             return nRunningQueries.get();
+         } else {
+             return null;
+         }
+     }
++    
++    // ============================================================================
++
++    final private String project;
++    final private int maxConcurrentQuery;
++    
++    public QueryRequestLimits(String project) {
++        this.project = project;
++        
++        ProjectManager mgr = ProjectManager.getInstance(KylinConfig.getInstanceFromEnv());
++        ProjectInstance prj = mgr.getProject(project);
++        this.maxConcurrentQuery = prj.getConfig().getQueryConcurrentRunningThresholdForProject();
++        
++        boolean ok = openQueryRequest(project, maxConcurrentQuery);
++        if (!ok) {
++            Message msg = MsgPicker.getMsg();
++            logger.warn("Directly return exception as too many concurrent query requests for project:" + project);
++            throw new BadRequestException(msg.getQUERY_TOO_MANY_RUNNING());
++        }
++    }
++
++    @Override
++    public void close() {
++        closeQueryRequest(project, maxConcurrentQuery);
++    }
+ }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
----------------------------------------------------------------------
diff --cc server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
index ada78cd,7e513ae..be20419
--- a/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/util/ValidateUtil.java
@@@ -169,4 -108,31 +169,5 @@@ public class ValidateUtil 
          }
          return cols;
      }
+ 
 -    public Set<String> getAllUsers(String project) throws IOException {
 -        Set<String> allUsers = new HashSet<>();
 -        // add users that is global admin
 -        for (ManagedUser managedUser : userService.listUsers()) {
 -            if (managedUser.getAuthorities().contains(new SimpleGrantedAuthority(Constant.ROLE_ADMIN))) {
 -                allUsers.add(managedUser.getUsername());
 -            }
 -        }
 -
 -        // add users that has project permission
 -        ProjectInstance prj = projectService.getProjectManager().getProject(project);
 -        AclEntity ae = accessService.getAclEntity("ProjectInstance", prj.getUuid());
 -        Acl acl = accessService.getAcl(ae);
 -        if (acl != null && acl.getEntries() != null) {
 -            for (AccessControlEntry ace : acl.getEntries()) {
 -                allUsers.add(((PrincipalSid) ace.getSid()).getPrincipal());
 -            }
 -        }
 -        return allUsers;
 -    }
 -
 -    public void validateArgs(String... args) {
 -        for (String arg : args) {
 -            Preconditions.checkState(!StringUtils.isEmpty(arg));
 -        }
 -    }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/main/resources/kylinSecurity.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/controller/CubeControllerTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
----------------------------------------------------------------------
diff --cc server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
index fc97eff,414a241..528b18e
--- a/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/security/QueryWithTableACLTest.java
@@@ -23,10 -23,9 +23,10 @@@ import java.sql.SQLException
  
  import org.apache.kylin.common.KylinConfig;
  import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 +import org.apache.kylin.metadata.MetadataConstants;
  import org.apache.kylin.metadata.acl.TableACLManager;
  import org.apache.kylin.query.security.AccessDeniedException;
- import org.apache.kylin.query.security.QuerACLTestUtil;
+ import org.apache.kylin.query.security.QueryACLTestUtil;
  import org.hamcrest.CoreMatchers;
  import org.junit.After;
  import org.junit.Before;
@@@ -59,26 -55,26 +59,26 @@@ public class QueryWithTableACLTest exte
  
      @Test
      public void testFailQuery() throws SQLException, IOException {
-         QuerACLTestUtil.setUser(MODELER);
-         QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
+         QueryACLTestUtil.setUser(MODELER);
+         QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
  
-         QuerACLTestUtil.setUser(ADMIN);
+         QueryACLTestUtil.setUser(ADMIN);
 -        TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE);
 +        TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER);
          thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class));
          thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied"));
-         QuerACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
+         QueryACLTestUtil.mockQuery(PROJECT, "select * from STREAMING_TABLE");
      }
  
      @Test
      public void testFailQueryWithCountStar() throws SQLException, IOException {
-         QuerACLTestUtil.setUser(MODELER);
-         QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
+         QueryACLTestUtil.setUser(MODELER);
+         QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
  
-         QuerACLTestUtil.setUser(ADMIN);
+         QueryACLTestUtil.setUser(ADMIN);
 -        TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE);
 +        TableACLManager.getInstance(KylinConfig.getInstanceFromEnv()).addTableACL(PROJECT, "ADMIN", STREAMING_TABLE, MetadataConstants.TYPE_USER);
          thrown.expectCause(CoreMatchers.isA(AccessDeniedException.class));
          thrown.expectMessage(CoreMatchers.containsString("Query failed.Access table:DEFAULT.STREAMING_TABLE denied"));
-         QuerACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
+         QueryACLTestUtil.mockQuery(PROJECT, "select count(*) from STREAMING_TABLE");
      }
  
      @After

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
----------------------------------------------------------------------
diff --cc server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
index 75b8f21,0000000..3ee14e0
mode 100644,000000..100644
--- a/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
+++ b/server/src/test/java/org/apache/kylin/rest/service/AdminServiceTest.java
@@@ -1,72 -1,0 +1,62 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + * 
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + * 
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 +*/
 +
 +package org.apache.kylin.rest.service;
 +
 +import java.io.File;
 +import java.io.IOException;
 +
 +import org.apache.commons.io.FileUtils;
 +import org.apache.kylin.common.KylinConfig;
 +import org.junit.Assert;
 +import org.junit.Test;
 +import org.springframework.beans.factory.annotation.Autowired;
 +import org.springframework.beans.factory.annotation.Qualifier;
 +
 +/**
 + * 
 + */
 +public class AdminServiceTest extends ServiceTestBase {
 +
 +    @Autowired
 +    @Qualifier("adminService")
 +    private AdminService adminService;
 +
 +    @Test
 +    public void testGetPublicConfig() throws IOException {
 +        //set ../examples/test_metadata/kylin.properties empty
 +        File file = FileUtils.getFile(LOCALMETA_TEMP_DATA + "kylin.properties");
 +        FileUtils.deleteQuietly(file);
 +        FileUtils.touch(file);
 +        String path = Thread.currentThread().getContextClassLoader().getResource("kylin.properties").getPath();
 +        KylinConfig.setKylinConfigThreadLocal(KylinConfig.createInstanceFromUri(path));
-         String expected = "kylin.web.link-streaming-guide=http://kylin.apache.org/\n" +
-                 "kylin.web.contact-mail=\n" +
-                 "kylin.query.cache-enabled=true\n" +
-                 "kylin.web.link-diagnostic=\n" +
-                 "kylin.web.help.length=4\n" +
-                 "kylin.web.timezone=GMT+8\n" +
-                 "kylin.server.external-acl-provider=\n" +
-                 "kylin.storage.default=2\n" +
-                 "kylin.web.help=\n" +
-                 "kylin.web.export-allow-other=true\n" +
-                 "kylin.web.link-hadoop=\n" +
-                 "kylin.web.hide-measures=\n" +
-                 "kylin.htrace.show-gui-trace-toggle=false\n" +
-                 "kylin.web.export-allow-admin=true\n" +
-                 "kylin.env=QA\n" +
-                 "kylin.web.hive-limit=20\n" +
-                 "kylin.engine.default=2\n" +
-                 "kylin.web.help.3=onboard|Cube Design Tutorial|\n" +
-                 "kylin.web.help.2=tableau|Tableau Guide|\n" +
-                 "kylin.web.help.1=odbc|ODBC Driver|\n" +
-                 "kylin.web.help.0=start|Getting Started|\n" +
-                 "kylin.security.profile=testing\n";
-         Assert.assertEquals(expected, adminService.getPublicConfig());
++        
++        String publicConfig = adminService.getPublicConfig();
++        
++        Assert.assertFalse(publicConfig.contains("kylin.metadata.data-model-manager-impl"));
++        Assert.assertFalse(publicConfig.contains("kylin.dictionary.use-forest-trie"));
++        Assert.assertFalse(publicConfig.contains("kylin.cube.segment-advisor"));
++        Assert.assertFalse(publicConfig.contains("kylin.job.use-remote-cli"));
++        Assert.assertFalse(publicConfig.contains("kylin.job.scheduler.provider"));
++        Assert.assertFalse(publicConfig.contains("kylin.engine.mr.job-jar"));
++        Assert.assertFalse(publicConfig.contains("kylin.engine.spark.sanity-check-enabled"));
++        Assert.assertFalse(publicConfig.contains("kylin.storage.provider"));
++        Assert.assertFalse(publicConfig.contains("kylin.query.convert-create-table-to-with"));
++        Assert.assertFalse(publicConfig.contains("kylin.server.init-tasks"));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/server/src/test/java/org/apache/kylin/rest/service/CacheServiceTest.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
----------------------------------------------------------------------
diff --cc source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 0b23121,c700d82..e4564d0
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@@ -223,8 -224,10 +224,10 @@@ public class HiveMRInput implements IMR
                              .append("CREATE EXTERNAL TABLE IF NOT EXISTS " + intermediate + " LIKE " + identity + "\n");
                      createIntermediateTableHql.append("LOCATION '" + jobWorkingDir + "/" + intermediate + "';\n");
                      createIntermediateTableHql
+                             .append("ALTER TABLE " + intermediate + " SET TBLPROPERTIES('auto.purge'='true');\n");
+                     createIntermediateTableHql
 -                            .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + ";\n");
 -                    hiveCmdBuilder.addStatement(createIntermediateTableHql.toString());
 +                            .append("INSERT OVERWRITE TABLE " + intermediate + " SELECT * FROM " + identity + "\n");
 +                    hiveCmdBuilder.addStatementWithRedistributeBy(createIntermediateTableHql);
                      hiveViewIntermediateTables = hiveViewIntermediateTables + intermediate + ";";
                  }
              }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
----------------------------------------------------------------------
diff --cc source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 59780e6,d05f14e..7fc26de
--- a/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@@ -133,7 -134,7 +134,7 @@@ public class JdbcHiveMRInput extends Hi
              if (partitionDesc.isPartitioned()) {
                  partCol = partitionDesc.getPartitionDateColumn();//tablename.colname
                  partitionString = partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                     flatDesc.getSegment(), flatDesc.getSegRange());
 -                        flatDesc.getSegRange());
++                        flatDesc.getSegment(), flatDesc.getSegRange());
              }
  
              String splitTable;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
----------------------------------------------------------------------
diff --cc source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
index 37bf8ff,fe5812b..c045ff7
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/job/MergeOffsetStep.java
@@@ -63,20 -63,20 +63,20 @@@ public class MergeOffsetStep extends Ab
          final CubeSegment first = mergingSegs.get(0);
          final CubeSegment last = mergingSegs.get(mergingSegs.size() - 1);
  
 -        segment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
 -        segment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
 -        segment.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
 +        segCopy.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
 +        segCopy.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
 +        segCopy.setSourcePartitionOffsetEnd(last.getSourcePartitionOffsetEnd());
  
 -        segment.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
 +        segCopy.setTSRange(new TSRange(mergingSegs.getTSStart(), mergingSegs.getTSEnd()));
  
 -        CubeUpdate cubeBuilder = new CubeUpdate(cube);
 -        cubeBuilder.setToUpdateSegs(segment);
 +        CubeUpdate update = new CubeUpdate(cubeCopy);
 +        update.setToUpdateSegs(segCopy);
          try {
 -            cubeManager.updateCube(cubeBuilder);
 -            return new ExecuteResult();
 +            cubeManager.updateCube(update);
-             return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed");
++            return ExecuteResult.createSucceed();
          } catch (IOException e) {
              logger.error("fail to update cube segment offset", e);
-             return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage());
+             return ExecuteResult.createError(e);
          }
      }
  

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeHBaseEndpointRPC.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 073f183,838112f..68aa172
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@@ -28,9 -30,14 +29,9 @@@ import org.apache.commons.cli.Options
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.FileSystem;
  import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.hbase.KeyValue;
 -import org.apache.hadoop.io.IOUtils;
  import org.apache.hadoop.io.NullWritable;
  import org.apache.hadoop.io.SequenceFile;
 -import org.apache.hadoop.io.Text;
 -import org.apache.hadoop.io.Writable;
 -import org.apache.hadoop.util.ReflectionUtils;
 -import org.apache.hadoop.util.StringUtils;
  import org.apache.hadoop.util.ToolRunner;
  import org.apache.kylin.common.KylinConfig;
  import org.apache.kylin.common.util.Bytes;
@@@ -71,6 -79,8 +73,7 @@@ public class CreateHTableJob extends Ab
          options.addOption(OPTION_CUBE_NAME);
          options.addOption(OPTION_SEGMENT_ID);
          options.addOption(OPTION_PARTITION_FILE_PATH);
 -        options.addOption(OPTION_STATISTICS_ENABLED);
+         options.addOption(OPTION_CUBOID_MODE);
          parseOptions(options, args);
  
          partitionFilePath = new Path(getOptionValue(OPTION_PARTITION_FILE_PATH));
@@@ -81,12 -92,31 +84,29 @@@
          cubeDesc = cube.getDescriptor();
          kylinConfig = cube.getConfig();
          segmentID = getOptionValue(OPTION_SEGMENT_ID);
+         cuboidModeName = getOptionValue(OPTION_CUBOID_MODE);
          CubeSegment cubeSegment = cube.getSegmentById(segmentID);
  
 -        Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
 -
          byte[][] splitKeys;
-         final Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
 -        if (statsEnabled) {
 -            Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, null, kylinConfig).getCuboidSizeMap();
 -            Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
 -            if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
 -                Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
 -                for (Long cuboid : buildingCuboids) {
 -                    Double cuboidSize = cuboidSizeMap.get(cuboid);
 -                    if (cuboidSize == null) {
 -                        logger.warn(cuboid + "cuboid's size is null will replace by 0");
 -                        cuboidSize = 0.0;
 -                    }
 -                    optimizedCuboidSizeMap.put(cuboid, cuboidSize);
++        Map<Long, Double> cuboidSizeMap = new CubeStatsReader(cubeSegment, kylinConfig).getCuboidSizeMap();
++        
++        // for cube planner, will keep cuboidSizeMap unchanged if cube planner is disabled
++        Set<Long> buildingCuboids = cube.getCuboidsByMode(cuboidModeName);
++        if (buildingCuboids != null && !buildingCuboids.isEmpty()) {
++            Map<Long, Double> optimizedCuboidSizeMap = Maps.newHashMapWithExpectedSize(buildingCuboids.size());
++            for (Long cuboid : buildingCuboids) {
++                Double cuboidSize = cuboidSizeMap.get(cuboid);
++                if (cuboidSize == null) {
++                    logger.warn(cuboid + "cuboid's size is null will replace by 0");
++                    cuboidSize = 0.0;
+                 }
 -                cuboidSizeMap = optimizedCuboidSizeMap;
++                optimizedCuboidSizeMap.put(cuboid, cuboidSize);
+             }
 -            splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment, partitionFilePath.getParent());
 -        } else {
 -            splitKeys = getRegionSplits(conf, partitionFilePath);
++            cuboidSizeMap = optimizedCuboidSizeMap;
+         }
++        
 +        splitKeys = getRegionSplitsFromCuboidStatistics(cuboidSizeMap, kylinConfig, cubeSegment,
 +                partitionFilePath.getParent());
  
          CubeHTableUtil.createHTable(cubeSegment, splitKeys);
          return 0;

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
index 06c0923,8817cb2..ea372d9
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMROutput2Transition.java
@@@ -185,4 -199,30 +199,30 @@@ public class HBaseMROutput2Transition i
              throw new IllegalStateException("No merging segment's last build job ID equals " + jobID);
          }
      }
+ 
+     public IMRBatchOptimizeOutputSide2 getBatchOptimizeOutputSide(final CubeSegment seg) {
+         return new IMRBatchOptimizeOutputSide2() {
+             HBaseMRSteps steps = new HBaseMRSteps(seg);
+ 
+             @Override
+             public void addStepPhase2_CreateHTable(DefaultChainedExecutable jobFlow) {
 -                jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
++                jobFlow.addTask(steps.createCreateHTableStep(jobFlow.getId(), CuboidModeEnum.RECOMMEND));
+             }
+ 
+             @Override
+             public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
+                 jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
+                 jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
+             }
+ 
+             public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+                 steps.addOptimizeGarbageCollectionSteps(jobFlow);
+             }
+ 
+             @Override
+             public void addStepPhase5_Cleanup(DefaultChainedExecutable jobFlow) {
+                 steps.addCheckpointGarbageCollectionSteps(jobFlow);
+             }
+         };
+     }
  }

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
----------------------------------------------------------------------
diff --cc storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
index 3dd61d8,67c94ad..99cc9a7
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseMRSteps.java
@@@ -74,14 -76,31 +76,18 @@@ public class HBaseMRSteps extends JobBu
          return rowkeyDistributionStep;
      }
  
- 
      public HadoopShellExecutable createCreateHTableStep(String jobId) {
 -        return createCreateHTableStep(jobId, false);
++        return createCreateHTableStep(jobId, CuboidModeEnum.CURRENT);
+     }
 -
 -    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId) {
 -        return createCreateHTableStep(jobId, true);
 -    }
 -
 -    public HadoopShellExecutable createCreateHTableStepWithStats(String jobId, CuboidModeEnum cuboidMode) {
 -        return createCreateHTableStep(jobId, true, cuboidMode);
 -    }
 -
 -    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats) {
 -        return createCreateHTableStep(jobId, withStats, CuboidModeEnum.CURRENT);
 -    }
 -
 -    private HadoopShellExecutable createCreateHTableStep(String jobId, boolean withStats, CuboidModeEnum cuboidMode) {
++    
++    public HadoopShellExecutable createCreateHTableStep(String jobId, CuboidModeEnum cuboidMode) {
          HadoopShellExecutable createHtableStep = new HadoopShellExecutable();
          createHtableStep.setName(ExecutableConstants.STEP_NAME_CREATE_HBASE_TABLE);
          StringBuilder cmd = new StringBuilder();
          appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
          appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid());
          appendExecCmdParameters(cmd, BatchConstants.ARG_PARTITION, getRowkeyDistributionOutputPath(jobId) + "/part-r-00000");
 -        appendExecCmdParameters(cmd, BatchConstants.ARG_STATS_ENABLED, String.valueOf(withStats));
+         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBOID_MODE, cuboidMode.toString());
  
          createHtableStep.setJobParams(cmd.toString());
          createHtableStep.setJobClass(CreateHTableJob.class);

http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
----------------------------------------------------------------------
diff --cc tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index 0000000,2be381c..666d23a
mode 000000,100644..100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@@ -1,0 -1,673 +1,656 @@@
+ /*
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+ */
+ 
+ package org.apache.kylin.tool.metrics.systemcube;
+ 
 -import java.io.ByteArrayOutputStream;
 -import java.io.DataOutputStream;
+ import java.util.List;
+ import java.util.Map;
+ import java.util.Set;
+ 
+ import org.apache.kylin.common.KylinConfig;
+ import org.apache.kylin.common.util.Pair;
 -import org.apache.kylin.cube.CubeDescManager;
+ import org.apache.kylin.cube.model.AggregationGroup;
+ import org.apache.kylin.cube.model.CubeDesc;
+ import org.apache.kylin.cube.model.DimensionDesc;
+ import org.apache.kylin.cube.model.HBaseColumnDesc;
+ import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+ import org.apache.kylin.cube.model.HBaseMappingDesc;
+ import org.apache.kylin.cube.model.RowKeyColDesc;
+ import org.apache.kylin.cube.model.RowKeyDesc;
+ import org.apache.kylin.cube.model.SelectRule;
+ import org.apache.kylin.dimension.DictionaryDimEnc;
+ import org.apache.kylin.job.constant.JobStatusEnum;
+ import org.apache.kylin.measure.percentile.PercentileMeasureType;
+ import org.apache.kylin.metadata.model.FunctionDesc;
+ import org.apache.kylin.metadata.model.IEngineAware;
+ import org.apache.kylin.metadata.model.MeasureDesc;
+ import org.apache.kylin.metadata.model.ParameterDesc;
+ import org.apache.kylin.metrics.lib.SinkTool;
+ import org.apache.kylin.metrics.lib.impl.RecordEvent;
+ import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
+ import org.apache.kylin.metrics.property.JobPropertyEnum;
+ import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
+ import org.apache.kylin.metrics.property.QueryPropertyEnum;
+ import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
 -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+ 
+ import com.google.common.collect.Lists;
+ import com.google.common.collect.Maps;
+ import com.google.common.collect.Sets;
+ 
+ public class CubeDescCreator {
+ 
 -    public static void main(String[] args) throws Exception {
 -        //        KylinConfig.setSandboxEnvIfPossible();
 -        KylinConfig config = KylinConfig.getInstanceFromEnv();
 -
 -        CubeDesc kylinCubeDesc = generateKylinCubeDescForMetricsQuery(config, new HiveSinkTool());
 -        ByteArrayOutputStream buf = new ByteArrayOutputStream();
 -        DataOutputStream dout = new DataOutputStream(buf);
 -        CubeDescManager.CUBE_DESC_SERIALIZER.serialize(kylinCubeDesc, dout);
 -        dout.close();
 -        buf.close();
 -        System.out.println(buf.toString());
 -    }
 -
+     public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, SinkTool sinkTool) {
+         String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
+ 
+         //Set for dimensions
+         List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery();
+         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+ 
+         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+         for (String dimensionName : dimensions) {
+             dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+         }
+ 
+         //Set for measures
+         List<String> measures = ModelCreator.getMeasuresForMetricsQuery();
+         measures.remove(QueryPropertyEnum.ID_CODE.toString());
+         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
+ 
+         List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQuery();
+         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+         for (Pair<String, String> entry : measureTypeList) {
+             measureTypeMap.put(entry.getKey(), entry.getValue());
+         }
+         measureDescList.add(getMeasureCount());
+         measureDescList.add(getMeasureMin(QueryPropertyEnum.TIME_COST.toString(),
+                 measureTypeMap.get(QueryPropertyEnum.TIME_COST.toString())));
+         for (String measure : measures) {
+             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+             measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+         }
+         measureDescList.add(getMeasureHLL(QueryPropertyEnum.ID_CODE.toString()));
+         measureDescList.add(getMeasurePercentile(QueryPropertyEnum.TIME_COST.toString()));
+ 
+         //Set for row key
+         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.USER.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.PROJECT.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.REALIZATION_TYPE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.EXCEPTION.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryPropertyEnum.TYPE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+         idx++;
+ 
+         RowKeyDesc rowKeyDesc = new RowKeyDesc();
+         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+ 
+         //Set for aggregation group
+         String[][] hierarchy_dims = new String[2][];
+         hierarchy_dims[0] = getTimeHierarchy();
+         hierarchy_dims[1] = new String[2];
+         hierarchy_dims[1][0] = QueryPropertyEnum.REALIZATION_TYPE.toString();
+         hierarchy_dims[1][1] = QueryPropertyEnum.REALIZATION.toString();
+         for (int i = 0; i < hierarchy_dims.length; i++) {
+             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+         }
+ 
+         SelectRule selectRule = new SelectRule();
+         selectRule.mandatoryDims = new String[0];
+         selectRule.hierarchyDims = hierarchy_dims;
+         selectRule.jointDims = new String[0][0];
+ 
+         AggregationGroup aggGroup = new AggregationGroup();
+         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+         aggGroup.setSelectRule(selectRule);
+ 
+         //Set for hbase mapping
+         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+ 
+         return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+                 rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+     }
+ 
+     public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, SinkTool sinkTool) {
+         String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
+ 
+         //Set for dimensions
+         List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube();
+         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+         dimensions.remove(QueryCubePropertyEnum.PROJECT.toString());
+ 
+         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+         for (String dimensionName : dimensions) {
+             dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+         }
+ 
+         //Set for measures
+         List<String> measures = ModelCreator.getMeasuresForMetricsQueryCube();
+         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2);
+ 
+         List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryCube();
+         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+         for (Pair<String, String> entry : measureTypeList) {
+             measureTypeMap.put(entry.getKey(), entry.getValue());
+         }
+         measureDescList.add(getMeasureCount());
+         for (String measure : measures) {
+             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+             if (!measure.equals(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString())) {
+                 measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+             }
+         }
+ 
+         //Set for row key
+         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.SEGMENT.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_SOURCE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.CUBOID_TARGET.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.FILTER_MASK.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_MATCH.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryCubePropertyEnum.IF_SUCCESS.toString(), idx + 1);
+         idx++;
+ 
+         RowKeyDesc rowKeyDesc = new RowKeyDesc();
+         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+ 
+         //Set for aggregation group
+         String[] mandatory_dims = new String[] { QueryCubePropertyEnum.CUBE.toString() };
+         mandatory_dims = refineColumnWithTable(tableName, mandatory_dims);
+ 
+         String[][] hierarchy_dims = new String[1][];
+         hierarchy_dims[0] = getTimeHierarchy();
+         for (int i = 0; i < hierarchy_dims.length; i++) {
+             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+         }
+ 
+         String[][] joint_dims = new String[1][];
+         joint_dims[0] = new String[] { QueryCubePropertyEnum.CUBOID_SOURCE.toString(),
+                 QueryCubePropertyEnum.CUBOID_TARGET.toString() };
+         for (int i = 0; i < joint_dims.length; i++) {
+             joint_dims[i] = refineColumnWithTable(tableName, joint_dims[i]);
+         }
+ 
+         SelectRule selectRule = new SelectRule();
+         selectRule.mandatoryDims = mandatory_dims;
+         selectRule.hierarchyDims = hierarchy_dims;
+         selectRule.jointDims = joint_dims;
+ 
+         AggregationGroup aggGroup = new AggregationGroup();
+         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+         aggGroup.setSelectRule(selectRule);
+ 
+         //Set for hbase mapping
+         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+ 
+         return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+                 rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+     }
+ 
+     public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, SinkTool sinkTool) {
+         String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
+ 
+         //Set for dimensions
+         List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC();
+         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+ 
+         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+         for (String dimensionName : dimensions) {
+             dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+         }
+ 
+         //Set for measures
+         List<String> measures = ModelCreator.getMeasuresForMetricsQueryRPC();
+         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(measures.size() * 2 + 1 + 1);
+ 
+         List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsQueryRPC();
+         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+         for (Pair<String, String> entry : measureTypeList) {
+             measureTypeMap.put(entry.getKey(), entry.getValue());
+         }
+         measureDescList.add(getMeasureCount());
+         for (String measure : measures) {
+             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+             measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+         }
+         measureDescList.add(getMeasurePercentile(QueryRPCPropertyEnum.CALL_TIME.toString()));
+ 
+         //Set for row key
+         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.PROJECT.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.REALIZATION.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.RPC_SERVER.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, RecordEvent.RecordReserveKeyEnum.HOST.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, QueryRPCPropertyEnum.EXCEPTION.toString(), idx + 1);
+         idx++;
+ 
+         RowKeyDesc rowKeyDesc = new RowKeyDesc();
+         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+ 
+         //Set for aggregation group
+         String[][] hierarchy_dims = new String[1][];
+         hierarchy_dims[0] = getTimeHierarchy();
+         for (int i = 0; i < hierarchy_dims.length; i++) {
+             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+         }
+ 
+         SelectRule selectRule = new SelectRule();
+         selectRule.mandatoryDims = new String[0];
+         selectRule.hierarchyDims = hierarchy_dims;
+         selectRule.jointDims = new String[0][0];
+ 
+         AggregationGroup aggGroup = new AggregationGroup();
+         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+         aggGroup.setSelectRule(selectRule);
+ 
+         //Set for hbase mapping
+         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+ 
+         return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+                 rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+     }
+ 
+     public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig config, SinkTool sinkTool) {
+         String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob());
+ 
+         //Set for dimensions
+         List<String> dimensions = ModelCreator.getDimensionsForMetricsJob();
+         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+ 
+         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+         for (String dimensionName : dimensions) {
+             dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+         }
+ 
+         //Set for measures
+         List<String> measures = ModelCreator.getMeasuresForMetricsJob();
+         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize((measures.size() - 4) * 3 + 1 + 1 + 4);
+ 
+         Set<String> stepDuration = Sets.newHashSet();
+         stepDuration.add(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString());
+         stepDuration.add(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString());
+         stepDuration.add(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString());
+         stepDuration.add(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString());
+ 
+         List<Pair<String, String>> measureTypeList = HiveTableCreator.getHiveColumnsForMetricsJob();
+         Map<String, String> measureTypeMap = Maps.newHashMapWithExpectedSize(measureTypeList.size());
+         for (Pair<String, String> entry : measureTypeList) {
+             measureTypeMap.put(entry.getKey(), entry.getValue());
+         }
+         measureDescList.add(getMeasureCount());
+         for (String measure : measures) {
+             measureDescList.add(getMeasureSum(measure, measureTypeMap.get(measure)));
+             measureDescList.add(getMeasureMax(measure, measureTypeMap.get(measure)));
+             if (!stepDuration.contains(measure)) {
+                 measureDescList.add(getMeasureMin(measure, measureTypeMap.get(measure)));
+             }
+         }
+         measureDescList.add(getMeasurePercentile(JobPropertyEnum.BUILD_DURATION.toString()));
+ 
+         //Set for row key
+         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1);
+         idx++;
+ 
+         RowKeyDesc rowKeyDesc = new RowKeyDesc();
+         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+ 
+         //Set for aggregation group
+         String[][] hierarchy_dims = new String[1][];
+         hierarchy_dims[0] = getTimeHierarchy();
+         for (int i = 0; i < hierarchy_dims.length; i++) {
+             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+         }
+ 
+         SelectRule selectRule = new SelectRule();
+         selectRule.mandatoryDims = new String[0];
+         selectRule.hierarchyDims = hierarchy_dims;
+         selectRule.jointDims = new String[0][0];
+ 
+         AggregationGroup aggGroup = new AggregationGroup();
+         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+         aggGroup.setSelectRule(selectRule);
+ 
+         //Set for hbase mapping
+         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+ 
+         return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+                 rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+     }
+ 
+     public static CubeDesc generateKylinCubeDescForMetricsJobException(KylinConfig config, SinkTool sinkTool) {
+         String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException());
+ 
+         //Set for dimensions
+         List<String> dimensions = ModelCreator.getDimensionsForMetricsJobException();
+         dimensions.remove(TimePropertyEnum.DAY_TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.TIME.toString());
+         dimensions.remove(RecordEvent.RecordReserveKeyEnum.HOST.toString());
+ 
+         List<DimensionDesc> dimensionDescList = Lists.newArrayListWithExpectedSize(dimensions.size());
+         for (String dimensionName : dimensions) {
+             dimensionDescList.add(getDimensionDesc(tableName, dimensionName));
+         }
+ 
+         //Set for measures
+         List<String> measures = ModelCreator.getMeasuresForMetricsJobException();
+         measures.remove(JobPropertyEnum.ID_CODE.toString());
+         List<MeasureDesc> measureDescList = Lists.newArrayListWithExpectedSize(1);
+ 
+         measureDescList.add(getMeasureCount());
+ 
+         //Set for row key
+         RowKeyColDesc[] rowKeyColDescs = new RowKeyColDesc[dimensionDescList.size()];
+         int idx = getTimeRowKeyColDesc(tableName, rowKeyColDescs);
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.USER.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.PROJECT.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.CUBE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.ALGORITHM.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.TYPE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, JobPropertyEnum.EXCEPTION.toString(), idx + 1);
+         idx++;
+ 
+         RowKeyDesc rowKeyDesc = new RowKeyDesc();
+         rowKeyDesc.setRowkeyColumns(rowKeyColDescs);
+ 
+         //Set for aggregation group
+         String[][] hierarchy_dims = new String[1][];
+         hierarchy_dims[0] = getTimeHierarchy();
+         for (int i = 0; i < hierarchy_dims.length; i++) {
+             hierarchy_dims[i] = refineColumnWithTable(tableName, hierarchy_dims[i]);
+         }
+ 
+         SelectRule selectRule = new SelectRule();
+         selectRule.mandatoryDims = new String[0];
+         selectRule.hierarchyDims = hierarchy_dims;
+         selectRule.jointDims = new String[0][0];
+ 
+         AggregationGroup aggGroup = new AggregationGroup();
+         aggGroup.setIncludes(refineColumnWithTable(tableName, dimensions));
+         aggGroup.setSelectRule(selectRule);
+ 
+         //Set for hbase mapping
+         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
+         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
+ 
+         return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList,
+                 rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties());
+     }
+ 
+     public static CubeDesc generateKylinCubeDesc(String tableName, int storageType,
+             List<DimensionDesc> dimensionDescList, List<MeasureDesc> measureDescList, RowKeyDesc rowKeyDesc,
+             AggregationGroup aggGroup, HBaseMappingDesc hBaseMapping, Map<String, String> overrideProperties) {
+         CubeDesc desc = new CubeDesc();
+         desc.setName(tableName.replace('.', '_'));
+         desc.setModelName(tableName.replace('.', '_'));
+         desc.setDescription("");
+         desc.setLastModified(0L);
+         desc.setDimensions(dimensionDescList);
+         desc.setMeasures(measureDescList);
+         desc.setRowkey(rowKeyDesc);
+         desc.setHbaseMapping(hBaseMapping);
+         desc.setNotifyList(Lists.<String> newArrayList());
+         desc.setStatusNeedNotify(Lists.newArrayList(JobStatusEnum.ERROR.toString()));
+         desc.setAutoMergeTimeRanges(new long[] { 86400000L, 604800000L, 2419200000L });
+         desc.setEngineType(IEngineAware.ID_MR_V2);
+         desc.setStorageType(storageType);
+         desc.setAggregationGroups(Lists.newArrayList(aggGroup));
+         desc.getOverrideKylinProps().putAll(overrideProperties);
+         desc.setSignature(desc.calculateSignature());
+         desc.updateRandomUuid();
+         return desc;
+     }
+ 
+     public static HBaseColumnFamilyDesc[] getHBaseColumnFamily(List<MeasureDesc> measureDescList) {
+         List<String> normalMeasureList = Lists.newLinkedList();
+         List<String> largeMeasureList = Lists.newLinkedList();
+         for (MeasureDesc measureDesc : measureDescList) {
+             if (measureDesc.getFunction().isCountDistinct()
+                     || measureDesc.getFunction().getExpression().equals(PercentileMeasureType.FUNC_PERCENTILE)) {
+                 largeMeasureList.add(measureDesc.getName());
+             } else {
+                 normalMeasureList.add(measureDesc.getName());
+             }
+         }
+         List<HBaseColumnFamilyDesc> columnFamilyDescList = Lists.newLinkedList();
+         int idx = 1;
+         if (normalMeasureList.size() > 0) {
+             HBaseColumnDesc columnDesc = new HBaseColumnDesc();
+             columnDesc.setQualifier("M");
+             columnDesc.setMeasureRefs(normalMeasureList.toArray(new String[normalMeasureList.size()]));
+             HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc();
+             columnFamilyDesc.setName("F" + idx++);
+             columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc });
+ 
+             columnFamilyDescList.add(columnFamilyDesc);
+         }
+         for (String largeMeasure : largeMeasureList) {
+             HBaseColumnDesc columnDesc = new HBaseColumnDesc();
+             columnDesc.setQualifier("M");
+             columnDesc.setMeasureRefs(new String[] { largeMeasure });
+             HBaseColumnFamilyDesc columnFamilyDesc = new HBaseColumnFamilyDesc();
+             columnFamilyDesc.setName("F" + idx++);
+             columnFamilyDesc.setColumns(new HBaseColumnDesc[] { columnDesc });
+ 
+             columnFamilyDescList.add(columnFamilyDesc);
+         }
+ 
+         return columnFamilyDescList.toArray(new HBaseColumnFamilyDesc[columnFamilyDescList.size()]);
+     }
+ 
+     public static String[] getTimeHierarchy() {
+         String[] result = new String[4];
+         result[0] = TimePropertyEnum.YEAR.toString();
+         result[1] = TimePropertyEnum.MONTH.toString();
+         result[2] = TimePropertyEnum.WEEK_BEGIN_DATE.toString();
+         result[3] = TimePropertyEnum.DAY_DATE.toString();
+         return result;
+     }
+ 
+     public static String[] refineColumnWithTable(String tableName, List<String> columns) {
+         String[] dimensions = new String[columns.size()];
+         for (int i = 0; i < dimensions.length; i++) {
+             dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns.get(i);
+         }
+         return dimensions;
+     }
+ 
+     public static String[] refineColumnWithTable(String tableName, String[] columns) {
+         String[] dimensions = new String[columns.length];
+         for (int i = 0; i < dimensions.length; i++) {
+             dimensions[i] = tableName.substring(tableName.lastIndexOf(".") + 1) + "." + columns[i];
+         }
+         return dimensions;
+     }
+ 
+     public static int getTimeRowKeyColDesc(String tableName, RowKeyColDesc[] rowKeyColDescs) {
+         int idx = 0;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.DAY_DATE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.WEEK_BEGIN_DATE.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.MONTH.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.YEAR.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_HOUR.toString(), idx + 1);
+         idx++;
+         rowKeyColDescs[idx] = getRowKeyColDesc(tableName, TimePropertyEnum.TIME_MINUTE.toString(), idx + 1);
+         idx++;
+         return idx;
+     }
+ 
+     public static RowKeyColDesc getRowKeyColDesc(String tableName, String column, int id) {
+         RowKeyColDesc rowKeyColDesc = new RowKeyColDesc();
+         rowKeyColDesc.setIndex(Integer.toString(id));
+         rowKeyColDesc.setColumn(tableName.substring(tableName.lastIndexOf(".") + 1) + "." + column);
+         rowKeyColDesc.setEncoding(DictionaryDimEnc.ENCODING_NAME);
+         rowKeyColDesc.setShardBy(false);
+         return rowKeyColDesc;
+     }
+ 
+     public static DimensionDesc getDimensionDesc(String tableName, String dimension) {
+         DimensionDesc dimensionDesc = new DimensionDesc();
+         dimensionDesc.setName(dimension);
+         dimensionDesc.setTable(tableName.substring(tableName.lastIndexOf(".") + 1));
+         dimensionDesc.setColumn(dimension);
+         return dimensionDesc;
+     }
+ 
+     public static MeasureDesc getMeasureCount() {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue("1");
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_CONSTANT);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(FunctionDesc.FUNC_COUNT);
+         function.setParameter(parameterDesc);
+         function.setReturnType(HiveTableCreator.HiveTypeEnum.HBIGINT.toString());
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName("_COUNT_");
+         result.setFunction(function);
+         return result;
+     }
+ 
+     public static MeasureDesc getMeasureSum(String column, String dataType) {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue(column);
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(FunctionDesc.FUNC_SUM);
+         function.setParameter(parameterDesc);
+         function.setReturnType(dataType.equals(HiveTableCreator.HiveTypeEnum.HDOUBLE.toString())
+                 ? HiveTableCreator.HiveTypeEnum.HDECIMAL.toString()
+                 : dataType);
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName(column + "_SUM");
+         result.setFunction(function);
+         return result;
+     }
+ 
+     public static MeasureDesc getMeasureMax(String column, String dataType) {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue(column);
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(FunctionDesc.FUNC_MAX);
+         function.setParameter(parameterDesc);
+         function.setReturnType(dataType);
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName(column + "_MAX");
+         result.setFunction(function);
+         return result;
+     }
+ 
+     public static MeasureDesc getMeasureMin(String column, String dataType) {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue(column);
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(FunctionDesc.FUNC_MIN);
+         function.setParameter(parameterDesc);
+         function.setReturnType(dataType);
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName(column + "_MIN");
+         result.setFunction(function);
+         return result;
+     }
+ 
+     public static MeasureDesc getMeasureHLL(String column) {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue(column);
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(FunctionDesc.FUNC_COUNT_DISTINCT);
+         function.setParameter(parameterDesc);
+         function.setReturnType("hllc12");
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName(column + "_HLL");
+         result.setFunction(function);
+         return result;
+     }
+ 
+     public static MeasureDesc getMeasurePercentile(String column) {
+         ParameterDesc parameterDesc = new ParameterDesc();
+         parameterDesc.setValue(column);
+         parameterDesc.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
+ 
+         FunctionDesc function = new FunctionDesc();
+         function.setExpression(PercentileMeasureType.FUNC_PERCENTILE);
+         function.setParameter(parameterDesc);
+         function.setReturnType("percentile(100)");
+ 
+         MeasureDesc result = new MeasureDesc();
+         result.setName(column + "_PERCENTILE");
+         result.setFunction(function);
+         return result;
+     }
+ }