You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/27 14:43:50 UTC

[kylin] branch master updated: KYLIN-3597 fix static code issues

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bcc698  KYLIN-3597 fix static code issues
7bcc698 is described below

commit 7bcc6987e5bbcfd57f320aa19c4fba67ed02aaea
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Nov 27 18:41:24 2018 +0800

    KYLIN-3597 fix static code issues
    
    fix UT
---
 .../java/org/apache/kylin/common/util/Bytes.java   |  47 ++++-----
 .../validation/rule/AggregationGroupRule.java      |  28 +++---
 .../kylin/job/execution/AbstractExecutable.java    |  27 ++---
 .../BitmapIntersectDistinctCountAggFunc.java       |   8 +-
 .../kylin/metadata/cachesync/Broadcaster.java      |  23 ++---
 .../datasource/adaptor/AbstractJdbcAdaptor.java    |  10 +-
 .../sdk/datasource/adaptor/DefaultAdaptor.java     |   3 +-
 .../apache/kylin/engine/mr/ByteArrayWritable.java  |  14 +--
 .../mr/steps/RowKeyDistributionCheckerJob.java     |  93 -----------------
 .../mr/steps/RowKeyDistributionCheckerMapper.java  | 112 ---------------------
 .../mr/steps/RowKeyDistributionCheckerReducer.java |  51 ----------
 .../broadcaster/BroadcasterReceiveServlet.java     |   9 +-
 .../kylin/source/jdbc/extensible/JdbcExplorer.java |  88 ++++++++--------
 .../source/kafka/util/KafkaSampleProducer.java     |  12 +--
 14 files changed, 124 insertions(+), 401 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
index 33fc31a..a267921 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
@@ -41,7 +41,6 @@ import java.util.Locale;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import sun.misc.Unsafe;
 
 /**
@@ -116,6 +115,7 @@ public class Bytes {
     // JHat says BU is 56 bytes.
     // SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?)
     public static final int ESTIMATED_HEAP_TAX = 16;
+    public static final String LENGTH_MUST_BE_GREATER_THAN_0 = "length must be greater than 0";
 
     /**
      * Returns length of the byte array, returning 0 if the array is null.
@@ -491,7 +491,7 @@ public class Bytes {
      * @return incremented offset
      */
     public static int putLongUnsafe(byte[] bytes, int offset, long val) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             val = Long.reverseBytes(val);
         }
         org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putLong(bytes,
@@ -652,7 +652,7 @@ public class Bytes {
      * @return the int value
      */
     public static int toIntUnsafe(byte[] bytes, int offset) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             return Integer.reverseBytes(
                     org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
                             .getInt(bytes, (long) offset
@@ -672,7 +672,7 @@ public class Bytes {
      * @return the short value
      */
     public static short toShortUnsafe(byte[] bytes, int offset) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             return Short.reverseBytes(
                     org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
                             .getShort(bytes, (long) offset
@@ -692,7 +692,7 @@ public class Bytes {
      * @return the long value
      */
     public static long toLongUnsafe(byte[] bytes, int offset) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             return Long.reverseBytes(
                     org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
                             .getLong(bytes, (long) offset
@@ -763,7 +763,7 @@ public class Bytes {
      * @return incremented offset
      */
     public static int putIntUnsafe(byte[] bytes, int offset, int val) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             val = Integer.reverseBytes(val);
         }
         org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putInt(bytes,
@@ -880,7 +880,7 @@ public class Bytes {
      * @return incremented offset
      */
     public static int putShortUnsafe(byte[] bytes, int offset, short val) {
-        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+        if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
             val = Short.reverseBytes(val);
         }
         org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putShort(bytes,
@@ -1097,7 +1097,7 @@ public class Bytes {
                 }
             }
 
-            static final boolean littleEndian = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+            static final boolean LITTLEENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
 
             /**
              * Returns true if x1 is less than x2, when both values are treated as
@@ -1164,7 +1164,7 @@ public class Bytes {
                     long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
                     long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
                     long diff = lw ^ rw;
-                    if (littleEndian) {
+                    if (LITTLEENDIAN) {
                         lw = Long.reverseBytes(lw);
                         rw = Long.reverseBytes(rw);
                     }
@@ -1177,7 +1177,7 @@ public class Bytes {
                 if (minLength - offset >= SIZEOF_INT) {
                     int il = theUnsafe.getInt(buffer1, offset1Adj + offset);
                     int ir = theUnsafe.getInt(buffer2, offset2Adj + offset);
-                    if (littleEndian) {
+                    if (LITTLEENDIAN) {
                         il = Integer.reverseBytes(il);
                         ir = Integer.reverseBytes(ir);
                     }
@@ -1189,7 +1189,7 @@ public class Bytes {
                 if (minLength - offset >= SIZEOF_SHORT) {
                     short sl = theUnsafe.getShort(buffer1, offset1Adj + offset);
                     short sr = theUnsafe.getShort(buffer2, offset2Adj + offset);
-                    if (littleEndian) {
+                    if (LITTLEENDIAN) {
                         sl = Short.reverseBytes(sl);
                         sr = Short.reverseBytes(sr);
                     }
@@ -1316,10 +1316,7 @@ public class Bytes {
 
     /** Compute hash for binary data. */
     public static int hashBytes(byte[] bytes, int offset, int length) {
-        int hash = 1;
-        for (int i = offset; i < offset + length; i++)
-            hash = (31 * hash) + (int) bytes[i];
-        return hash;
+        return hashCode(bytes, offset, length);
     }
 
     /**
@@ -1537,12 +1534,7 @@ public class Bytes {
 
         };
 
-        return new Iterable<byte[]>() {
-            @Override
-            public Iterator<byte[]> iterator() {
-                return iterator;
-            }
-        };
+        return () -> iterator;
     }
 
     /**
@@ -1792,10 +1784,7 @@ public class Bytes {
 
     public static boolean equals(List<byte[]> a, List<byte[]> b) {
         if (a == null) {
-            if (b == null) {
-                return true;
-            }
-            return false;
+            return b == null;
         }
         if (b == null) {
             return false;
@@ -1894,7 +1883,7 @@ public class Bytes {
      */
     public static void zero(byte[] b, int offset, int length) {
         checkPositionIndex(offset, b.length, "offset");
-        checkArgument(length > 0, "length must be greater than 0");
+        checkArgument(length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
         checkPositionIndex(offset + length, b.length, "offset + length");
         Arrays.fill(b, offset, offset + length, (byte) 0);
     }
@@ -1919,7 +1908,7 @@ public class Bytes {
      */
     public static void random(byte[] b, int offset, int length) {
         checkPositionIndex(offset, b.length, "offset");
-        checkArgument(length > 0, "length must be greater than 0");
+        checkArgument(length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
         checkPositionIndex(offset + length, b.length, "offset + length");
         byte[] buf = new byte[length];
         RNG.nextBytes(buf);
@@ -1964,7 +1953,7 @@ public class Bytes {
      * @param b
      */
     public static String toHex(byte[] b) {
-        checkArgument(b.length > 0, "length must be greater than 0");
+        checkArgument(b.length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
         return String.format(Locale.ROOT, "%x", new BigInteger(1, b));
     }
 
@@ -1975,7 +1964,7 @@ public class Bytes {
      * @param hex
      */
     public static byte[] fromHex(String hex) {
-        checkArgument(hex.length() > 0, "length must be greater than 0");
+        checkArgument(hex.length() > 0, LENGTH_MUST_BE_GREATER_THAN_0);
         checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
         // Make sure letters are upper case
         hex = hex.toUpperCase(Locale.ROOT);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
index 2b0358d..4670ac1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
@@ -40,17 +40,16 @@ import com.google.common.collect.Lists;
  */
 public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
 
+    public static final String AGGREGATION_GROUP = "Aggregation group ";
+
     @Override
     public void validate(CubeDesc cube, ValidateContext context) {
         inner(cube, context);
     }
 
-    public AggregationGroupRule() {
-    }
-
     private void inner(CubeDesc cube, ValidateContext context) {
 
-        if (cube.getAggregationGroups() == null || cube.getAggregationGroups().size() == 0) {
+        if (cube.getAggregationGroups() == null || cube.getAggregationGroups().isEmpty()) {
             context.addResult(ResultLevel.ERROR, "Cube should have at least one Aggregation group.");
             return;
         }
@@ -58,12 +57,12 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
         int index = 1;
         for (AggregationGroup agg : cube.getAggregationGroups()) {
             if (agg.getIncludes() == null) {
-                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'includes' field not set");
+                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'includes' field not set");
                 continue;
             }
 
             if (agg.getSelectRule() == null) {
-                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'select rule' field not set");
+                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'select rule' field not set");
                 continue;
             }
 
@@ -107,20 +106,20 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
                         notIncluded.add(dim);
                     }
                 }
-                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString());
+                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString());
                 continue;
             }
 
             if (CollectionUtils.containsAny(mandatoryDims, hierarchyDims)) {
                 Set<String> intersection = new HashSet<>(mandatoryDims);
                 intersection.retainAll(hierarchyDims);
-                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " mandatory dimension has overlap with hierarchy dimension: " + intersection.toString());
+                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " mandatory dimension has overlap with hierarchy dimension: " + intersection.toString());
                 continue;
             }
             if (CollectionUtils.containsAny(mandatoryDims, jointDims)) {
                 Set<String> intersection = new HashSet<>(mandatoryDims);
                 intersection.retainAll(jointDims);
-                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " mandatory dimension has overlap with joint dimension: " + intersection.toString());
+                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " mandatory dimension has overlap with joint dimension: " + intersection.toString());
                 continue;
             }
 
@@ -134,7 +133,7 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
                     }
 
                     if (oneJoint.size() < 2) {
-                        context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " require at least 2 dimensions in a joint: " + oneJoint.toString());
+                        context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " require at least 2 dimensions in a joint: " + oneJoint.toString());
                         continue;
                     }
                     jointDimNum += oneJoint.size();
@@ -149,13 +148,13 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
                                 overlapHierarchies++;
                             }
                             if (share.size() > 1) {
-                                context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " joint dimensions has overlap with more than 1 dimensions in same hierarchy: " + share.toString());
+                                context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " joint dimensions has overlap with more than 1 dimensions in same hierarchy: " + share.toString());
                                 continue;
                             }
                         }
 
                         if (overlapHierarchies > 1) {
-                            context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " joint dimensions has overlap with more than 1 hierarchies");
+                            context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " joint dimensions has overlap with more than 1 hierarchies");
                             continue;
                         }
                     }
@@ -175,7 +174,7 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
                         }
                         existing.addAll(oneJoint);
                     }
-                    context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " a dimension exists in more than one joint: " + overlap.toString());
+                    context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " a dimension exists in more than one joint: " + overlap.toString());
                     continue;
                 }
             }
@@ -186,9 +185,8 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
                 combination = getMaxCombinations(cube) + 1;
             } finally {
                 if (combination > getMaxCombinations(cube)) {
-                    String msg = "Aggregation group " + index + " has too many combinations, current combination is " + combination + ", max allowed combination is " + getMaxCombinations(cube) + "; use 'mandatory'/'hierarchy'/'joint' to optimize; or update 'kylin.cube.aggrgroup.max-combination' to a bigger value.";
+                    String msg = AGGREGATION_GROUP + index + " has too many combinations, current combination is " + combination + ", max allowed combination is " + getMaxCombinations(cube) + "; use 'mandatory'/'hierarchy'/'joint' to optimize; or update 'kylin.cube.aggrgroup.max-combination' to a bigger value.";
                     context.addResult(ResultLevel.ERROR, msg);
-                    continue;
                 }
             }
 
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index b8d3144..8548781 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -57,6 +57,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected static final String INTERRUPT_TIME = "interruptTime";
 
     protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+    public static final String NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY = "no need to send email, user list is empty";
     protected int retry = 0;
 
     private KylinConfig config;
@@ -103,8 +104,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
                     exception = e;
                     try {
                         Thread.sleep(1000L * (long) Math.pow(4, nRetry));
-                    } catch (InterruptedException exp) {
-                        throw new RuntimeException(exp);
+                    } catch (InterruptedException e1) {
+                        throw new IllegalStateException(e1);
                     }
                 } else {
                     throw e;
@@ -145,7 +146,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     @Override
     public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
 
-        logger.info("Executing AbstractExecutable (" + this.getName() + ")");
+        logger.info("Executing AbstractExecutable ({})", this.getName());
 
         Preconditions.checkArgument(executableContext instanceof DefaultContext);
         ExecuteResult result = null;
@@ -156,14 +157,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
             Throwable realException;
             do {
                 if (retry > 0) {
-                    logger.info("Retry " + retry);
+                    logger.info("Retry {}", retry);
                 }
                 catchedException = null;
                 result = null;
                 try {
                     result = doWork(executableContext);
                 } catch (Throwable e) {
-                    logger.error("error running Executable: " + this.toString());
+                    logger.error("error running Executable: {}", this.toString());
                     catchedException = e;
                 }
                 retry++;
@@ -191,7 +192,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
     protected void handleMetadataPersistException(ExecutableContext context, Throwable exception) {
         final String[] adminDls = context.getConfig().getAdminDls();
         if (adminDls == null || adminDls.length < 1) {
-            logger.warn("no need to send email, user list is empty");
+            logger.warn(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
             return;
         }
         List<String> users = Lists.newArrayList(adminDls);
@@ -199,7 +200,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         Map<String, Object> dataMap = Maps.newHashMap();
         dataMap.put("job_name", getName());
         dataMap.put("env_name", context.getConfig().getDeployEnv());
-        dataMap.put("submitter", StringUtil.noBlank(getSubmitter(), "missing submitter"));
+        dataMap.put(SUBMITTER, StringUtil.noBlank(getSubmitter(), "missing submitter"));
         dataMap.put("job_engine", MailNotificationUtil.getLocalHostName());
         dataMap.put("error_log",
                 Matcher.quoteReplacement(StringUtil.noBlank(exception.getMessage(), "no error message")));
@@ -314,7 +315,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         try {
             List<String> users = getAllNofifyUsers(config);
             if (users.isEmpty()) {
-                logger.debug("no need to send email, user list is empty");
+                logger.debug(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
                 return;
             }
             final Pair<String, String> email = formatNotifications(context, state);
@@ -341,10 +342,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
             logger.warn("no need to send email, content is null");
             return;
         }
-        logger.info("prepare to send email to:" + users);
-        logger.info("job name:" + getName());
-        logger.info("submitter:" + getSubmitter());
-        logger.info("notify list:" + users);
+        logger.info("prepare to send email to:{}", users);
+        logger.info("job name:{}", getName());
+        logger.info("submitter:{}", getSubmitter());
+        logger.info("notify list:{}", users);
         new MailService(kylinConfig).sendMail(users, email.getFirst(), email.getSecond());
     }
 
@@ -352,7 +353,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
         try {
             List<String> users = getAllNofifyUsers(config);
             if (users.isEmpty()) {
-                logger.debug("no need to send email, user list is empty");
+                logger.debug(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
                 return;
             }
             doSendMail(config, users, email);
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index a1e2665..9771352 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -50,10 +50,8 @@ public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount
                 this.keyList = keyList;
             }
             if (this.keyList != null && this.keyList.contains(key)) {
-                BitmapCounter counter = map.get(key);
-                if (counter == null) {
-                    map.put(key, counter = factory.newBitmap());
-                }
+                BitmapCounter counter = map.computeIfAbsent(key, o -> factory.newBitmap());
+
                 counter.orWith((BitmapCounter) value);
             }
         }
@@ -78,7 +76,7 @@ public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount
                     counter.andWith(c);
                 }
             }
-            return counter.getCount();
+            return counter != null ? counter.getCount() : 0;
         }
     }
 
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index d5ecc16..b9c03b6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -98,7 +98,7 @@ public class Broadcaster {
         if (nodes == null || nodes.length < 1) {
             logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
         }
-        logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
+        logger.debug("{} nodes in the cluster: {}", (nodes == null ? 0 : nodes.length), Arrays.toString(nodes));
 
         announceMainLoop.execute(new Runnable() {
             @Override
@@ -110,7 +110,7 @@ public class Broadcaster {
                         final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
 
                         String[] restServers = config.getRestServers();
-                        logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
+                        logger.debug("Servers in the cluster: {}", Arrays.toString(restServers));
                         for (final String node : restServers) {
                             if (restClientMap.containsKey(node) == false) {
                                 restClientMap.put(node, new RestClient(node));
@@ -120,7 +120,7 @@ public class Broadcaster {
                         String toWhere = broadcastEvent.getTargetNode();
                         if (toWhere == null)
                             toWhere = "all";
-                        logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent);
+                        logger.debug("Announcing new broadcast to {}: {}", toWhere, broadcastEvent);
                         
                         for (final String node : restServers) {
                             if (!(toWhere.equals("all") || toWhere.equals(node)))
@@ -196,11 +196,7 @@ public class Broadcaster {
     }
 
     private static void addListener(Map<String, List<Listener>> lmap, String entity, Listener listener) {
-        List<Listener> list = lmap.get(entity);
-        if (list == null) {
-            list = new ArrayList<>();
-            lmap.put(entity, list);
-        }
+        List<Listener> list = lmap.computeIfAbsent(entity, s -> new ArrayList<>());
         list.add(listener);
     }
 
@@ -244,7 +240,7 @@ public class Broadcaster {
         if (list.isEmpty())
             return;
 
-        logger.debug("Broadcasting " + event + ", " + entity + ", " + cacheKey);
+        logger.debug("Broadcasting {}, {}, {}", event, entity, cacheKey);
 
         switch (entity) {
         case SYNC_ALL:
@@ -278,7 +274,7 @@ public class Broadcaster {
             break;
         }
 
-        logger.debug("Done broadcasting " + event + ", " + entity + ", " + cacheKey);
+        logger.debug("Done broadcasting {}, {}, {}", event, entity, cacheKey);
     }
 
     /**
@@ -363,7 +359,7 @@ public class Broadcaster {
         }
     }
 
-    abstract public static class Listener {
+    public abstract static class Listener {
         public void onClearAll(Broadcaster broadcaster) throws IOException {
         }
 
@@ -452,10 +448,7 @@ public class Broadcaster {
             if (!StringUtils.equals(cacheKey, other.cacheKey)) {
                 return false;
             }
-            if (!StringUtils.equals(entity, other.entity)) {
-                return false;
-            }
-            return true;
+            return StringUtils.equals(entity, other.entity);
         }
 
         @Override
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
index 3669a32..f3dc78e 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
@@ -23,8 +23,8 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
-import java.util.LinkedHashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import javax.sql.rowset.CachedRowSet;
@@ -57,7 +57,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
      * @param config Basic configuration of JDBC source, such as driver name, URL, username, password.
      * @throws Exception If datasource cannot be connected.
      */
-    protected AbstractJdbcAdaptor(AdaptorConfig config) throws Exception {
+    protected AbstractJdbcAdaptor(AdaptorConfig config) throws ClassNotFoundException {
         this.config = config;
         this.dataSourceDef = DataSourceDefProvider.getInstance().getById(config.datasourceId);
 
@@ -138,7 +138,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
         try {
             closeable.close();
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new IllegalStateException(e);
         }
     }
 
@@ -151,7 +151,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
         try {
             dataSource.close();
         } catch (SQLException e) {
-            throw new RuntimeException(e);
+            throw new IllegalStateException(e);
         }
     }
 
@@ -367,7 +367,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
      * @param columnInfo The column information, in the pair of NAME -> TYPE
      * @return A set of SQL Statements which can be executed in JDBC source.
      */
-    public abstract String[] buildSqlToCreateTable(String identity, LinkedHashMap<String, String> columnInfo);
+    public abstract String[] buildSqlToCreateTable(String identity, Map<String, String> columnInfo);
 
     /**
      * To build a set of sql statements to create view in JDBC source.
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
index 3a14a1d..442b78e 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
@@ -22,7 +22,6 @@ import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Types;
 import java.util.ArrayList;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Locale;
@@ -246,7 +245,7 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
     }
 
     @Override
-    public String[] buildSqlToCreateTable(String tableIdentity, LinkedHashMap<String, String> columnInfo) {
+    public String[] buildSqlToCreateTable(String tableIdentity, Map<String, String> columnInfo) {
         String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
         String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
index a504899..037ab0b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -130,12 +130,12 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
      * @see java.lang.Object#equals(java.lang.Object)
      */
     @Override
-    public boolean equals(Object right_obj) {
-        if (right_obj instanceof byte[]) {
-            return compareTo((byte[]) right_obj) == 0;
+    public boolean equals(Object other) {
+        if (other instanceof byte[]) {
+            return compareTo((byte[]) other) == 0;
         }
-        if (right_obj instanceof ByteArrayWritable) {
-            return compareTo((ByteArrayWritable) right_obj) == 0;
+        if (other instanceof ByteArrayWritable) {
+            return compareTo((ByteArrayWritable) other) == 0;
         }
         return false;
     }
@@ -162,7 +162,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
     /** A Comparator optimized for ByteArrayWritable.
      */
     public static class Comparator extends WritableComparator {
-        private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+        private BytesWritable.Comparator instance = new BytesWritable.Comparator();
 
         /** constructor */
         public Comparator() {
@@ -174,7 +174,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
          */
         @Override
         public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            return comparator.compare(b1, s1, l1, b2, s2, l2);
+            return instance.compare(b1, s1, l1, b2, s2, l2);
         }
     }
 
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index 3419949..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
-    @SuppressWarnings("static-access")
-    protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
-    @Override
-    public int run(String[] args) throws Exception {
-        Options options = new Options();
-
-        options.addOption(OPTION_INPUT_PATH);
-        options.addOption(OPTION_OUTPUT_PATH);
-        options.addOption(OPTION_JOB_NAME);
-        options.addOption(ROW_KEY_STATS_FILE_PATH);
-
-        parseOptions(options, args);
-
-        String statsFilePath = getOptionValue(ROW_KEY_STATS_FILE_PATH);
-
-        // start job
-        String jobName = getOptionValue(OPTION_JOB_NAME);
-        job = Job.getInstance(getConf(), jobName);
-
-        setJobClasspath(job, KylinConfig.getInstanceFromEnv());
-
-        addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
-        Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
-        FileOutputFormat.setOutputPath(job, output);
-
-        // Mapper
-        job.setInputFormatClass(SequenceFileInputFormat.class);
-        job.setMapperClass(RowKeyDistributionCheckerMapper.class);
-        job.setMapOutputKeyClass(Text.class);
-        job.setMapOutputValueClass(LongWritable.class);
-
-        // Reducer - only one
-        job.setReducerClass(RowKeyDistributionCheckerReducer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(LongWritable.class);
-        job.setNumReduceTasks(1);
-
-        job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
-        this.deletePath(job.getConfiguration(), output);
-
-        return waitForCompletion(job);
-    }
-
-    public static void main(String[] args) throws Exception {
-        int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
-        System.exit(exitCode);
-    }
-
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index d428dcb..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
-    String rowKeyStatsFilePath;
-    byte[][] splitKeys;
-    Map<Text, Long> resultMap;
-    List<Text> keyList;
-
-    @Override
-    protected void doSetup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-
-        rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
-        splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
-        resultMap = new HashMap<Text, Long>();
-        keyList = new ArrayList<Text>();
-        for (int i = 0; i < splitKeys.length; i++) {
-            Text key = new Text(splitKeys[i]);
-            resultMap.put(key, 0L);
-            keyList.add(new Text(splitKeys[i]));
-        }
-    }
-
-    @Override
-    public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
-        for (Text t : keyList) {
-            if (key.compareTo(t) < 0) {
-                Long v = resultMap.get(t);
-                long length = (long)key.getLength() + value.getLength();
-                v += length;
-                resultMap.put(t, v);
-                break;
-            }
-        }
-    }
-
-    @Override
-    protected void doCleanup(Context context) throws IOException, InterruptedException {
-        LongWritable outputValue = new LongWritable();
-        for (Entry<Text, Long> kv : resultMap.entrySet()) {
-            outputValue.set(kv.getValue());
-            context.write(kv.getKey(), outputValue);
-        }
-    }
-
-    @SuppressWarnings("deprecation")
-    public byte[][] getSplits(Configuration conf, Path path) {
-        List<byte[]> rowkeyList = new ArrayList<byte[]>();
-        SequenceFile.Reader reader = null;
-        try {
-            reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf);
-            Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
-            Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
-            while (reader.next(key, value)) {
-                byte[] tmp = ((Text) key).copyBytes();
-                if (rowkeyList.contains(tmp) == false) {
-                    rowkeyList.add(tmp);
-                }
-            }
-        } catch (Exception e) {
-            e.printStackTrace();
-        } finally {
-            IOUtils.closeStream(reader);
-        }
-
-        byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
-        return retValue;
-    }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index af4a7df..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author ysong1
- * 
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
-    LongWritable outputKey = new LongWritable(0L);
-
-    @Override
-    protected void doSetup(Context context) throws IOException {
-        super.bindCurrentConfiguration(context.getConfiguration());
-    }
-
-    @Override
-    public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
-        long length = 0;
-        for (LongWritable v : values) {
-            length += v.get();
-        }
-
-        outputKey.set(length);
-        context.write(key, outputKey);
-    }
-}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java b/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
index c6247f0..a277cf3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
@@ -24,7 +24,6 @@ import java.io.InputStreamReader;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.servlet.ServletException;
 import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
@@ -40,7 +39,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
         void handle(String type, String name, String event);
     }
 
-    private BroadcasterHandler handler;
+    private final BroadcasterHandler handler;
 
     public BroadcasterReceiveServlet(BroadcasterHandler handler) {
         this.handler = handler;
@@ -50,16 +49,16 @@ public class BroadcasterReceiveServlet extends HttpServlet {
     private static final Pattern PATTERN2 = Pattern.compile("/(.+)/(.+)");
 
     @Override
-    protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException {
         handle(req, resp);
     }
 
     @Override
-    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
         handle(req, resp);
     }
 
-    private void handle(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+    private void handle(HttpServletRequest req, HttpServletResponse resp) throws IOException {
         final String startString = "/kylin/api/cache";
         final String requestURI = req.getRequestURI();
         final String substring = requestURI.substring(requestURI.indexOf(startString) + startString.length());
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
index 0875244..e9330e1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
@@ -74,55 +74,57 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
         tableDesc.setProject(prj);
         tableDesc.setSourceType(JdbcSource.SOURCE_ID);
 
-        CachedRowSet tables = dataSource.getTable(database, table);
-        String tableType = null;
-        while (tables.next()) {
-            tableType = tables.getString("TABLE_TYPE");
-        }
-        if (tableType != null) {
-            tableDesc.setTableType(tableType);
-        } else {
-            throw new RuntimeException(String.format(Locale.ROOT, "table %s not found in schema:%s", table, database));
+        try (CachedRowSet tables = dataSource.getTable(database, table)) {
+            String tableType = null;
+            while (tables.next()) {
+                tableType = tables.getString("TABLE_TYPE");
+            }
+            if (tableType != null) {
+                tableDesc.setTableType(tableType);
+            } else {
+                throw new RuntimeException(String.format(Locale.ROOT, "table %s not found in schema:%s", table, database));
+            }
         }
 
-        CachedRowSet columns = dataSource.listColumns(database, table);
-        List<ColumnDesc> columnDescs = new ArrayList<>();
-
-        while (columns.next()) {
-            String cname = columns.getString("COLUMN_NAME");
-            int type = columns.getInt("DATA_TYPE");
-            int csize = columns.getInt("COLUMN_SIZE");
-            int digits = columns.getInt("DECIMAL_DIGITS");
-            int pos = columns.getInt("ORDINAL_POSITION");
-            String remarks = columns.getString("REMARKS");
-
-            ColumnDesc cdesc = new ColumnDesc();
-            cdesc.setName(cname.toUpperCase(Locale.ROOT));
-
-            String kylinType = dataSource.toKylinTypeName(type);
-            if ("any".equals(kylinType)) {
-                String typeName = columns.getString("TYPE_NAME");
-                int kylinTypeId = dataSource.toKylinTypeId(typeName, type);
-                kylinType = dataSource.toKylinTypeName(kylinTypeId);
+        try (CachedRowSet columns = dataSource.listColumns(database, table)) {
+            List<ColumnDesc> columnDescs = new ArrayList<>();
+
+            while (columns.next()) {
+                String cname = columns.getString("COLUMN_NAME");
+                int type = columns.getInt("DATA_TYPE");
+                int csize = columns.getInt("COLUMN_SIZE");
+                int digits = columns.getInt("DECIMAL_DIGITS");
+                int pos = columns.getInt("ORDINAL_POSITION");
+                String remarks = columns.getString("REMARKS");
+
+                ColumnDesc cdesc = new ColumnDesc();
+                cdesc.setName(cname.toUpperCase(Locale.ROOT));
+
+                String kylinType = dataSource.toKylinTypeName(type);
+                if ("any".equals(kylinType)) {
+                    String typeName = columns.getString("TYPE_NAME");
+                    int kylinTypeId = dataSource.toKylinTypeId(typeName, type);
+                    kylinType = dataSource.toKylinTypeName(kylinTypeId);
+                }
+                int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
+                int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
+
+                cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
+                cdesc.setId(String.valueOf(pos));
+                cdesc.setComment(remarks);
+                columnDescs.add(cdesc);
             }
-            int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
-            int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
 
-            cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
-            cdesc.setId(String.valueOf(pos));
-            cdesc.setComment(remarks);
-            columnDescs.add(cdesc);
-        }
-
-        tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[columnDescs.size()]));
+            tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[columnDescs.size()]));
 
-        TableExtDesc tableExtDesc = new TableExtDesc();
-        tableExtDesc.setIdentity(tableDesc.getIdentity());
-        tableExtDesc.setUuid(UUID.randomUUID().toString());
-        tableExtDesc.setLastModified(0);
-        tableExtDesc.init(prj);
+            TableExtDesc tableExtDesc = new TableExtDesc();
+            tableExtDesc.setIdentity(tableDesc.getIdentity());
+            tableExtDesc.setUuid(UUID.randomUUID().toString());
+            tableExtDesc.setLastModified(0);
+            tableExtDesc.init(prj);
 
-        return Pair.newPair(tableDesc, tableExtDesc);
+            return Pair.newPair(tableDesc, tableExtDesc);
+        }
     }
 
     @Override
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 56e2dd5..5899c7c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -52,13 +52,13 @@ public class KafkaSampleProducer {
     private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
     private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
 
+    protected static final String OTHER = "Other";
     private static final ObjectMapper mapper = new ObjectMapper();
 
     public static void main(String[] args) throws Exception {
         logger.info("args: {}", Arrays.toString(args));
         OptionsHelper optionsHelper = new OptionsHelper();
         Options options = new Options();
-        String topic, broker;
         options.addOption(OPTION_TOPIC);
         options.addOption(OPTION_BROKER);
         options.addOption(OPTION_INTERVAL);
@@ -66,8 +66,8 @@ public class KafkaSampleProducer {
 
         logger.info("options: '{}'", optionsHelper.getOptionsAsString());
 
-        topic = optionsHelper.getOptionValue(OPTION_TOPIC);
-        broker = optionsHelper.getOptionValue(OPTION_BROKER);
+        final String topic = optionsHelper.getOptionValue(OPTION_TOPIC);
+        final String broker = optionsHelper.getOptionValue(OPTION_BROKER);
 
         long interval = 10;
         String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
@@ -83,18 +83,18 @@ public class KafkaSampleProducer {
         countries.add("JAPAN");
         countries.add("KOREA");
         countries.add("US");
-        countries.add("Other");
+        countries.add(OTHER);
         List<String> category = new ArrayList<>();
         category.add("BOOK");
         category.add("TOY");
         category.add("CLOTH");
         category.add("ELECTRONIC");
-        category.add("Other");
+        category.add(OTHER);
         List<String> devices = new ArrayList<>();
         devices.add("iOS");
         devices.add("Windows");
         devices.add("Andriod");
-        devices.add("Other");
+        devices.add(OTHER);
 
         List<String> genders = new ArrayList<>();
         genders.add("Male");