You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/12/12 01:00:05 UTC

[kylin] branch master updated: KYLIN-3597 Improve code smell

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b31a91  KYLIN-3597 Improve code smell
0b31a91 is described below

commit 0b31a911302c0754ae6da5745d127dcaa336a467
Author: hit-lacus <hi...@126.com>
AuthorDate: Tue Dec 11 16:52:14 2018 +0800

    KYLIN-3597 Improve code smell
---
 .../org/apache/kylin/cube/CubeDescManager.java     | 26 ++++++------
 .../kylin/cube/cli/DictionaryGeneratorCLI.java     | 15 ++++---
 .../kylin/cube/inmemcubing/DoggedCubeBuilder.java  | 21 ++--------
 .../kylin/cube/inmemcubing/InMemCubeBuilder.java   | 46 +++++++++++-----------
 .../measure/percentile/PercentileSerializer.java   |  1 +
 .../query/relnode/visitor/TupleFilterVisitor.java  |  5 +--
 .../org/apache/kylin/query/util/QueryUtil.java     | 11 ++++--
 .../source/kafka/util/KafkaSampleProducer.java     | 11 ++++--
 .../kylin/storage/hbase/util/RowCounterCLI.java    |  7 ++--
 9 files changed, 69 insertions(+), 74 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
index 4fb9522..a7459c0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeDescManager.java
@@ -21,6 +21,7 @@ package org.apache.kylin.cube;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 
 import org.apache.commons.lang3.StringUtils;
@@ -60,12 +61,15 @@ public class CubeDescManager {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeDescManager.class);
 
-    public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<CubeDesc>(CubeDesc.class);
+    public static final Serializer<CubeDesc> CUBE_DESC_SERIALIZER = new JsonSerializer<>(CubeDesc.class);
     
     public static CubeDescManager getInstance(KylinConfig config) {
         return config.getManager(CubeDescManager.class);
     }
 
+    static final String BROKEN_CUBE_MSG = "Broken cube desc %s";
+    static final String CUBE_SHOULD_NOT_BE_DRAFT_MSG = "CubeDesc '%s' must not be a draft";
+
     // called by reflection
     static CubeDescManager newInstance(KylinConfig config) throws IOException {
         return new CubeDescManager(config);
@@ -84,20 +88,20 @@ public class CubeDescManager {
     private AutoReadWriteLock descMapLock = new AutoReadWriteLock();
 
     private CubeDescManager(KylinConfig cfg) throws IOException {
-        logger.info("Initializing CubeDescManager with config " + cfg);
+        logger.info("Initializing CubeDescManager with config {}", cfg);
         this.config = cfg;
-        this.cubeDescMap = new CaseInsensitiveStringCache<CubeDesc>(config, "cube_desc");
+        this.cubeDescMap = new CaseInsensitiveStringCache<>(config, "cube_desc");
         this.crud = new CachedCrudAssist<CubeDesc>(getStore(), ResourceStore.CUBE_DESC_RESOURCE_ROOT, CubeDesc.class,
                 cubeDescMap) {
             @Override
             protected CubeDesc initEntityAfterReload(CubeDesc cubeDesc, String resourceName) {
                 if (cubeDesc.isDraft())
-                    throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+                    throw new IllegalArgumentException(String.format(Locale.ROOT, CUBE_SHOULD_NOT_BE_DRAFT_MSG, cubeDesc.getName()));
 
                 try {
                     cubeDesc.init(config);
                 } catch (Exception e) {
-                    logger.warn("Broken cube desc " + cubeDesc.resourceName(), e);
+                    logger.warn(String.format(Locale.ROOT, BROKEN_CUBE_MSG, cubeDesc.resourceName()), e);
                     cubeDesc.addError(e.toString());
                 }
                 return cubeDesc;
@@ -175,10 +179,6 @@ public class CubeDescManager {
 
     /**
      * Create a new CubeDesc
-     * 
-     * @param cubeDesc
-     * @return
-     * @throws IOException
      */
     public CubeDesc createCubeDesc(CubeDesc cubeDesc) throws IOException {
         try (AutoLock lock = descMapLock.lockForWrite()) {
@@ -187,7 +187,7 @@ public class CubeDescManager {
             if (cubeDescMap.containsKey(cubeDesc.getName()))
                 throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' already exists");
             if (cubeDesc.isDraft())
-                throw new IllegalArgumentException("CubeDesc '" + cubeDesc.getName() + "' must not be a draft");
+                throw new IllegalArgumentException(String.format(Locale.ROOT, CUBE_SHOULD_NOT_BE_DRAFT_MSG, cubeDesc.getName()));
 
             try {
                 cubeDesc.init(config);
@@ -219,10 +219,6 @@ public class CubeDescManager {
 
     /**
      * Update CubeDesc with the input. Broadcast the event into cluster
-     * 
-     * @param desc
-     * @return
-     * @throws IOException
      */
     public CubeDesc updateCubeDesc(CubeDesc desc) throws IOException {
         try (AutoLock lock = descMapLock.lockForWrite()) {
@@ -233,7 +229,7 @@ public class CubeDescManager {
             if (!cubeDescMap.containsKey(name))
                 throw new IllegalArgumentException("CubeDesc '" + name + "' does not exist.");
             if (desc.isDraft())
-                throw new IllegalArgumentException("CubeDesc '" + desc.getName() + "' must not be a draft");
+                throw new IllegalArgumentException(String.format(Locale.ROOT, CUBE_SHOULD_NOT_BE_DRAFT_MSG, desc.getName()));
 
             try {
                 desc.init(config);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
index 1b5cf63..1bec02a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cli/DictionaryGeneratorCLI.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.cube.cli;
 
 import java.io.IOException;
+import java.util.Locale;
 import java.util.Set;
 
 import org.apache.hadoop.io.IOUtils;
@@ -42,6 +43,8 @@ import com.google.common.collect.Sets;
 
 public class DictionaryGeneratorCLI {
 
+    private DictionaryGeneratorCLI(){}
+
     private static final Logger logger = LoggerFactory.getLogger(DictionaryGeneratorCLI.class);
 
     public static void processSegment(KylinConfig config, String cubeName, String segmentID, String uuid,
@@ -58,7 +61,7 @@ public class DictionaryGeneratorCLI {
 
         // dictionary
         for (TblColRef col : cubeSeg.getCubeDesc().getAllColumnsNeedDictionaryBuilt()) {
-            logger.info("Building dictionary for " + col);
+            logger.info("Building dictionary for {}", col);
             IReadableTable inpTable = factTableValueProvider.getDistinctValuesFor(col);
 
             Dictionary<String> preBuiltDict = null;
@@ -67,10 +70,10 @@ public class DictionaryGeneratorCLI {
             }
 
             if (preBuiltDict != null) {
-                logger.debug("Dict for '" + col.getName() + "' has already been built, save it");
+                logger.debug("Dict for '{}' has already been built, save it", col.getName());
                 cubeMgr.saveDictionary(cubeSeg, col, inpTable, preBuiltDict);
             } else {
-                logger.debug("Dict for '" + col.getName() + "' not pre-built, build it from " + inpTable.toString());
+                logger.debug("Dict for '{}' not pre-built, build it from {}", col.getName(), inpTable);
                 cubeMgr.buildDictionary(cubeSeg, col, inpTable);
             }
         }
@@ -90,14 +93,14 @@ public class DictionaryGeneratorCLI {
         }
 
         for (String tableIdentity : toSnapshot) {
-            logger.info("Building snapshot of " + tableIdentity);
+            logger.info("Building snapshot of {}", tableIdentity);
             cubeMgr.buildSnapshotTable(cubeSeg, tableIdentity, uuid);
         }
 
         CubeInstance updatedCube = cubeMgr.getCube(cubeSeg.getCubeInstance().getName());
         cubeSeg = updatedCube.getSegmentById(cubeSeg.getUuid());
         for (TableRef lookup : toCheckLookup) {
-            logger.info("Checking snapshot of " + lookup);
+            logger.info("Checking snapshot of {}", lookup);
             try {
                 JoinDesc join = cubeSeg.getModel().getJoinsTree().getJoinByPKSide(lookup);
                 ILookupTable table = cubeMgr.getLookupTable(cubeSeg, join);
@@ -105,7 +108,7 @@ public class DictionaryGeneratorCLI {
                     IOUtils.closeStream(table);
                 }
             } catch (Throwable th) {
-                throw new RuntimeException("Checking snapshot of " + lookup + " failed.", th);
+                throw new RuntimeException(String.format(Locale.ROOT, "Checking snapshot of %s failed.", lookup), th);
             }
         }
     }
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index 06e4a5d..8368051 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -93,7 +93,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     splits.add(last);
 
                     last.start();
-                    logger.info("Split #" + splits.size() + " kickoff");
+                    logger.info("Split #{} kickoff", splits.size());
 
                     // Build splits sequentially
                     last.join();
@@ -101,7 +101,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                     checkException(splits);
                 }
 
-                logger.info("Dogged Cube Build splits complete, took " + (System.currentTimeMillis() - start) + " ms");
+                logger.info("Dogged Cube Build splits complete, took {} ms", (System.currentTimeMillis() - start));
 
                 merger.mergeAndOutput(splits, output);
 
@@ -116,7 +116,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
             } finally {
                 output.close();
                 closeGirdTables(splits);
-                logger.info("Dogged Cube Build end, totally took " + (System.currentTimeMillis() - start) + " ms");
+                logger.info("Dogged Cube Build end, totally took {} ms", (System.currentTimeMillis() - start));
                 ensureExit(splits);
                 logger.info("Dogged Cube Build return");
             }
@@ -173,20 +173,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
                 if (split.exception != null)
                     errors.add(split.exception);
             }
-
-            if (errors.isEmpty()) {
-                return;
-            } else if (errors.size() == 1) {
-                Throwable t = errors.get(0);
-                if (t instanceof IOException)
-                    throw (IOException) t;
-                else
-                    throw new IOException(t);
-            } else {
-                for (Throwable t : errors)
-                    logger.error("Exception during in-mem cube build", t);
-                throw new IOException(errors.size() + " exceptions during in-mem cube build, cause set to the first, check log for more", errors.get(0));
-            }
+            InMemCubeBuilder.processErrors(errors);
         }
     }
 
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index ef61ce9..e0bdb20 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -111,9 +111,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 new CubeDimEncMap(cubeDesc, dictionaryMap)
         );
 
-        // Below several store implementation are very similar in performance. The ConcurrentDiskStore is the simplest.
-        // MemDiskStore store = new MemDiskStore(info, memBudget == null ? MemoryBudgetController.ZERO_BUDGET : memBudget);
-        // MemDiskStore store = new MemDiskStore(info, MemoryBudgetController.ZERO_BUDGET);
         ConcurrentDiskStore store = new ConcurrentDiskStore(info);
 
         GridTable gridTable = new GridTable(info, store);
@@ -156,7 +153,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private <T> void build(RecordConsumeBlockingQueueController<T> input, ICuboidCollector collector)
             throws IOException {
         long startTime = System.currentTimeMillis();
-        logger.info("In Mem Cube Build start, " + cubeDesc.getName());
+        logger.info("In Mem Cube Build start, {}", cubeDesc.getName());
 
         baseCuboidMemTracker = new MemoryWaterLevel();
         baseCuboidMemTracker.markLow();
@@ -186,7 +183,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         join(taskThreads);
 
         long endTime = System.currentTimeMillis();
-        logger.info("In Mem Cube Build end, " + cubeDesc.getName() + ", takes " + (endTime - startTime) + " ms");
+        logger.info("In Mem Cube Build end, {}, takes {} ms", cubeDesc.getName(), (endTime - startTime));
 
         throwExceptionIfAny();
     }
@@ -216,12 +213,16 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     }
 
     private void throwExceptionIfAny() throws IOException {
-        ArrayList<Throwable> errors = new ArrayList<Throwable>();
+        ArrayList<Throwable> errors = new ArrayList<>();
         for (int i = 0; i < taskThreadCount; i++) {
             Throwable t = taskThreadExceptions[i];
             if (t != null)
                 errors.add(t);
         }
+        processErrors(errors);
+    }
+
+    static void processErrors(List<Throwable> errors) throws IOException{
         if (errors.isEmpty()) {
             return;
         } else if (errors.size() == 1) {
@@ -313,26 +314,26 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private void makeMemoryBudget() {
         baseResult.aggrCacheMB = Math.max(baseCuboidMemTracker.getEstimateMB(), 10); // 10 MB at minimal
-        logger.debug("Base cuboid aggr cache is " + baseResult.aggrCacheMB + " MB");
+        logger.debug("Base cuboid aggr cache is {} MB", baseResult.aggrCacheMB);
         int systemAvailMB = MemoryBudgetController.gcAndGetSystemAvailMB();
-        logger.debug("System avail " + systemAvailMB + " MB");
+        logger.debug("System avail {} MB", systemAvailMB);
         int reserve = reserveMemoryMB;
-        logger.debug("Reserve " + reserve + " MB for system basics");
+        logger.debug("Reserve {} MB for system basics", reserve);
 
         int budget = systemAvailMB - reserve;
         if (budget < baseResult.aggrCacheMB) {
             // make sure we have base aggr cache as minimal
             budget = baseResult.aggrCacheMB;
-            logger.warn("System avail memory (" + systemAvailMB + " MB) is less than base aggr cache (" + baseResult.aggrCacheMB + " MB) + minimal reservation (" + reserve + " MB), consider increase JVM heap -Xmx");
+            logger.warn("System avail memory ({} MB) is less than base aggr cache ({} MB) + minimal reservation ({} MB), consider increase JVM heap -Xmx", systemAvailMB, baseResult.aggrCacheMB, reserve);
         }
 
-        logger.debug("Memory Budget is " + budget + " MB");
+        logger.debug("Memory Budget is {} MB", budget);
         memBudget = new MemoryBudgetController(budget);
     }
 
     private <T> CuboidResult createBaseCuboid(RecordConsumeBlockingQueueController<T> input) throws IOException {
         long startTime = System.currentTimeMillis();
-        logger.info("Calculating base cuboid " + baseCuboidId);
+        logger.info("Calculating base cuboid {}", baseCuboidId);
 
         GridTable baseCuboid = newGridTableByCuboidID(baseCuboidId);
         GTBuilder baseBuilder = baseCuboid.rebuild();
@@ -358,10 +359,10 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         }
 
         long timeSpent = System.currentTimeMillis() - startTime;
-        logger.info("Cuboid " + baseCuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+        logger.info("Cuboid {} has {} rows, build takes {}ms", baseCuboidId, count, timeSpent);
 
         int mbEstimateBaseAggrCache = (int) (aggregationScanner.getEstimateSizeOfAggrCache() / MemoryBudgetController.ONE_MB);
-        logger.info("Wild estimate of base aggr cache is " + mbEstimateBaseAggrCache + " MB");
+        logger.info("Wild estimate of base aggr cache is {} MB", mbEstimateBaseAggrCache);
 
         return updateCuboidResult(baseCuboidId, baseCuboid, count, timeSpent, 0, input.inputConverterUnit.ifChange());
     }
@@ -427,7 +428,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 aggrMask[i] = !measureDescs[i].getFunction().getMeasureType().onlyAggrInBaseCuboid();
 
                 if (!aggrMask[i]) {
-                    logger.info(measureDescs[i].toString() + " doesn't need aggregation.");
+                    logger.info("{} doesn't need aggregation.", measureDescs[i]);
                 }
             }
             scanner.setAggrMask(aggrMask);
@@ -438,7 +439,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private CuboidResult scanAndAggregateGridTable(GridTable gridTable, long parentId, long cuboidId, ImmutableBitSet aggregationColumns, ImmutableBitSet measureColumns) throws IOException {
         long startTime = System.currentTimeMillis();
-        logger.info("Calculating cuboid " + cuboidId);
+        logger.info("Calculating cuboid {}", cuboidId);
 
         GTAggregateScanner scanner = prepareGTAggregationScanner(gridTable, parentId, cuboidId, aggregationColumns, measureColumns);
         GridTable newGridTable = newGridTableByCuboidID(cuboidId);
@@ -458,16 +459,13 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
                 builder.write(newRecord);
             }
 
-            //long t = System.currentTimeMillis();
-            //sanityCheck(parentId, cuboidId, scanner.getTotalSumForSanityCheck());
-            //logger.info("sanity check for Cuboid " + cuboidId + " cost " + (System.currentTimeMillis() - t) + "ms");
         } finally {
             scanner.close();
             builder.close();
         }
 
         long timeSpent = System.currentTimeMillis() - startTime;
-        logger.info("Cuboid " + cuboidId + " has " + count + " rows, build takes " + timeSpent + "ms");
+        logger.info("Cuboid {} has {} rows, build takes {}ms", cuboidId, count, timeSpent);
 
         return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
     }
@@ -498,9 +496,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             return;
         }
         if (Arrays.equals(totalSumForSanityCheck, totalSum) == false) {
-            logger.info("sanityCheck failed when calculate " + cuboidId + " from parent " + parentId);
-            logger.info("Expected: " + Arrays.toString(totalSumForSanityCheck));
-            logger.info("Actually: " + Arrays.toString(totalSum));
+            if(logger.isInfoEnabled()){
+                logger.info("sanityCheck failed when calculate{} from parent {}", cuboidId, parentId);
+                logger.info("Expected: {}", Arrays.toString(totalSumForSanityCheck));
+                logger.info("Actually: {}", Arrays.toString(totalSum));
+            }
             throw new IllegalStateException();
         }
     }
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index 203a975..d0ecba7 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -48,6 +48,7 @@ public class PercentileSerializer extends DataTypeSerializer<PercentileCounter>
         return current().getBytesEstimate();
     }
 
+    @Override
     protected double getStorageBytesEstimate(double count) {
         return current().getBytesEstimate(count);
     }
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index bf52f91..faa9988 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -130,10 +130,10 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
                 // is a trivial expr
                 return f;
             }
-            //else go to default
-        default:
             filter = new UnsupportedTupleFilter(TupleFilter.FilterOperatorEnum.UNSUPPORTED);
             break;
+        default:
+            filter = new UnsupportedTupleFilter(TupleFilter.FilterOperatorEnum.UNSUPPORTED);
         }
 
         for (RexNode operand : call.operands) {
@@ -281,7 +281,6 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
             strValue = ((NlsString) literalValue).getValue();
         } else if (literalValue instanceof GregorianCalendar) {
             GregorianCalendar g = (GregorianCalendar) literalValue;
-            //strValue = "" + g.get(Calendar.YEAR) + "-" + normToTwoDigits(g.get(Calendar.MONTH) + 1) + "-" + normToTwoDigits(g.get(Calendar.DAY_OF_MONTH));
             strValue = Long.toString(g.getTimeInMillis());
         } else if (literalValue instanceof TimeUnitRange) {
             // Extract(x from y) in where clause
diff --git a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
index 5aad5e0..6407308 100644
--- a/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
+++ b/query/src/main/java/org/apache/kylin/query/util/QueryUtil.java
@@ -48,6 +48,10 @@ public class QueryUtil {
         String transform(String sql, String project, String defaultSchema);
     }
 
+    static final String KEYWORD_SELECT = "select";
+    static final String KEYWORD_WITH = "with";
+    static final String KEYWORD_EXPLAIN = "explain";
+
     /**
      * @deprecated Deprecated because of KYLIN-3594
      */
@@ -126,7 +130,7 @@ public class QueryUtil {
                 IQueryTransformer t = (IQueryTransformer) ClassUtil.newInstance(clz);
                 transformers.add(t);
             } catch (Exception e) {
-                throw new RuntimeException("Failed to init query transformer", e);
+                throw new IllegalStateException("Failed to init query transformer", e);
             }
         }
 
@@ -178,8 +182,9 @@ public class QueryUtil {
         String sql1 = sql.toLowerCase(Locale.ROOT);
         sql1 = removeCommentInSql(sql1);
         sql1 = sql1.trim();
-        return sql1.startsWith("select") || (sql1.startsWith("with") && sql1.contains("select"))
-                || (sql1.startsWith("explain") && sql1.contains("select"));
+
+        return sql1.startsWith(KEYWORD_SELECT) || (sql1.startsWith(KEYWORD_WITH) && sql1.contains(KEYWORD_SELECT))
+                || (sql1.startsWith(KEYWORD_EXPLAIN) && sql1.contains(KEYWORD_SELECT));
     }
 
     public static String removeCommentInSql(String sql1) {
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 5899c7c..51e8ff3 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -56,7 +56,7 @@ public class KafkaSampleProducer {
     private static final ObjectMapper mapper = new ObjectMapper();
 
     public static void main(String[] args) throws Exception {
-        logger.info("args: {}", Arrays.toString(args));
+        if(logger.isInfoEnabled()) logger.info("args: {}", Arrays.toString(args));
         OptionsHelper optionsHelper = new OptionsHelper();
         Options options = new Options();
         options.addOption(OPTION_TOPIC);
@@ -109,12 +109,12 @@ public class KafkaSampleProducer {
         props.put("buffer.memory", 33554432);
         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
-
+        long startTime = System.currentTimeMillis();
         try (Producer<String, String> producer = new KafkaProducer<>(props)) {
             boolean alive = true;
             Random rnd = new Random();
             Map<String, Object> record = new HashMap<>();
-            while (alive == true) {
+            while (alive) {
                 //add normal record
                 record.put("order_time", (new Date().getTime()));
                 record.put("country", countries.get(rnd.nextInt(countries.size())));
@@ -132,9 +132,12 @@ public class KafkaSampleProducer {
                 record.put("user", user);
                 //send message
                 ProducerRecord<String, String> data = new ProducerRecord<>(topic, System.currentTimeMillis() + "", mapper.writeValueAsString(record));
-                System.out.println("Sending 1 message: " + JsonUtil.writeValueAsString(record));
+                if(logger.isInfoEnabled()) logger.info("Sending 1 message: {}", JsonUtil.writeValueAsString(record));
                 producer.send(data);
                 Thread.sleep(interval);
+                if(System.currentTimeMillis() - startTime <= 7 * 24 * 3600 * 1000){
+                    alive = false;
+                }
             }
         }
     }
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
index d6367e5..1a659cb 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/RowCounterCLI.java
@@ -59,9 +59,10 @@ public class RowCounterCLI {
         } else {
             logger.info("startkey lenght: {}", startKey.length);
         }
-
-        logger.info("start key in binary: {}", Bytes.toStringBinary(startKey));
-        logger.info("end key in binary: {}", Bytes.toStringBinary(endKey));
+        if(logger.isInfoEnabled()){
+            logger.info("start key in binary: {}", Bytes.toStringBinary(startKey));
+            logger.info("end key in binary: {}", Bytes.toStringBinary(endKey));
+        }
 
         Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();