You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/26 13:32:51 UTC

[kylin] branch master updated: KYLIN-3597 fix sonar issues

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 95f399e  KYLIN-3597 fix sonar issues
95f399e is described below

commit 95f399e98957992e9a13fa1e71ee319e3018f246
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Sep 26 17:21:31 2018 +0800

    KYLIN-3597 fix sonar issues
---
 .../common/persistence/HDFSResourceStore.java      |  14 +-
 .../common/persistence/JDBCConnectionManager.java  |  16 +-
 .../kylin/common/persistence/JDBCResourceDAO.java  |  26 +-
 .../common/persistence/JDBCResourceStore.java      |   3 +-
 .../storage/gtrecord/GTCubeStorageQueryBase.java   |  49 ++--
 .../kylin/storage/hbase/steps/CreateHTableJob.java |  95 +++-----
 .../storage/hbase/util/HbaseStreamingInput.java    | 262 ---------------------
 7 files changed, 81 insertions(+), 384 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
index 1739ce0..e5bef40 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
@@ -65,18 +65,18 @@ public class HDFSResourceStore extends ResourceStore {
         if (path == null) {
             // missing path is not expected, but don't fail it
             path = kylinConfig.getHdfsWorkingDirectory() + "tmp_metadata";
-            logger.warn("Missing path, fall back to " + path);
+            logger.warn("Missing path, fall back to {0}", path);
         }
         
         fs = HadoopUtil.getFileSystem(path);
         Path metadataPath = new Path(path);
         if (fs.exists(metadataPath) == false) {
-            logger.warn("Path not exist in HDFS, create it: " + path);
+            logger.warn("Path not exist in HDFS, create it: {0}", path);
             createMetaFolder(metadataPath);
         }
 
         hdfsMetaPath = metadataPath;
-        logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+        logger.info("hdfs meta path : {0}", hdfsMetaPath.toString());
 
     }
 
@@ -86,7 +86,7 @@ public class HDFSResourceStore extends ResourceStore {
             fs.mkdirs(metaDirName);
         }
 
-        logger.info("hdfs meta path created: " + metaDirName.toString());
+        logger.info("hdfs meta path created: {0}", metaDirName.toString());
     }
 
     @Override
@@ -159,7 +159,7 @@ public class HDFSResourceStore extends ResourceStore {
         Path p = getRealHDFSPath(resPath);
         if (fs.exists(p) && fs.isFile(p)) {
             if (fs.getFileStatus(p).getLen() == 0) {
-                logger.warn("Zero length file: " + p.toString());
+                logger.warn("Zero length file: {0}", p.toString());
             }
             FSDataInputStream in = fs.open(p);
             long t = in.readLong();
@@ -190,9 +190,9 @@ public class HDFSResourceStore extends ResourceStore {
 
     @Override
     protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
-        logger.trace("res path : " + resPath);
+        logger.trace("res path : {0}", resPath);
         Path p = getRealHDFSPath(resPath);
-        logger.trace("put resource : " + p.toUri());
+        logger.trace("put resource : {0}", p.toUri());
         FSDataOutputStream out = null;
         try {
             out = fs.create(p, true);
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
index 753601a..5f56de1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -42,15 +42,9 @@ public class JDBCConnectionManager {
 
     private static JDBCConnectionManager INSTANCE = null;
 
-    private static Object lock = new Object();
-
-    public static JDBCConnectionManager getConnectionManager() {
+    public static synchronized JDBCConnectionManager getConnectionManager() {
         if (INSTANCE == null) {
-            synchronized (lock) {
-                if (INSTANCE == null) {
-                    INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
-                }
-            }
+            INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
         }
         return INSTANCE;
     }
@@ -67,10 +61,10 @@ public class JDBCConnectionManager {
             dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties());
             Connection conn = getConn();
             DatabaseMetaData mdm = conn.getMetaData();
-            logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion());
+            logger.info("Connected to {0} {1}", mdm.getDatabaseProductName(), mdm.getDatabaseProductVersion());
             closeQuietly(conn);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new IllegalArgumentException(e);
         }
     }
 
@@ -94,7 +88,7 @@ public class JDBCConnectionManager {
             ret.remove("passwordEncrypted");
         }
 
-        logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username"));
+        logger.info("Connecting to Jdbc with url:{0} by user {1}", ret.get("url"), ret.get("username"));
 
         putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
         putIfMissing(ret, "maxActive", "5");
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index a226af6..dce0894 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -32,8 +32,10 @@ import java.text.FieldPosition;
 import java.text.MessageFormat;
 import java.util.List;
 import java.util.Locale;
+import java.util.NavigableSet;
 import java.util.TreeSet;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -116,7 +118,7 @@ public class JDBCResourceDAO {
                     if (fetchContent) {
                         try {
                             resource.setContent(getInputStream(resourcePath, rs));
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             if (!isAllowBroken) {
                                 throw new SQLException(e);
                             }
@@ -147,8 +149,8 @@ public class JDBCResourceDAO {
     }
 
     //fetch primary key only
-    public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
-        final TreeSet<String> allResourceName = new TreeSet<>();
+    public NavigableSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
+        final NavigableSet<String> allResourceName = new TreeSet<>();
         executeSql(new SqlOperation() {
             @Override
             public void execute(Connection connection) throws SQLException {
@@ -158,7 +160,7 @@ public class JDBCResourceDAO {
                 rs = pstat.executeQuery();
                 while (rs.next()) {
                     String path = rs.getString(META_TABLE_KEY);
-                    assert path.startsWith(folderPath);
+                    Preconditions.checkState(path.startsWith(folderPath));
                     if (recursive) {
                         allResourceName.add(path);
                     } else {
@@ -192,7 +194,7 @@ public class JDBCResourceDAO {
                         resource.setTimestamp(rs.getLong(META_TABLE_TS));
                         try {
                             resource.setContent(getInputStream(resPath, rs));
-                        } catch (Throwable e) {
+                        } catch (Exception e) {
                             if (!isAllowBroken) {
                                 throw new SQLException(e);
                             }
@@ -240,7 +242,7 @@ public class JDBCResourceDAO {
         if (!skipHdfs) {
             try {
                 deleteHDFSResourceIfExist(resourcePath);
-            } catch (Throwable e) {
+            } catch (Exception e) {
                 throw new SQLException(e);
             }
         }
@@ -389,7 +391,7 @@ public class JDBCResourceDAO {
             bout = new ByteArrayOutputStream();
             IOUtils.copy(resource.getContent(), bout);
             return bout.toByteArray();
-        } catch (Throwable e) {
+        } catch (Exception e) {
             throw new SQLException(e);
         } finally {
             IOUtils.closeQuietly(bout);
@@ -635,10 +637,10 @@ public class JDBCResourceDAO {
             out = redirectFileSystem.create(redirectPath);
             out.write(largeColumn);
             return redirectPath;
-        } catch (Throwable e) {
+        } catch (Exception e) {
             try {
                 rollbackLargeCellFromHdfs(resPath);
-            } catch (Throwable ex) {
+            } catch (Exception ex) {
                 logger.error("fail to roll back resource " + resPath + " in hdfs", ex);
             }
             throw new SQLException(e);
@@ -659,12 +661,12 @@ public class JDBCResourceDAO {
                 redirectFileSystem.delete(redirectPath, true);
                 logger.warn("no backup for hdfs file {} is found, clean it", resPath);
             }
-        } catch (Throwable e) {
+        } catch (Exception e) {
 
             try {
                 //last try to delete redirectPath, because we prefer a deleted rather than incomplete
                 redirectFileSystem.delete(redirectPath, true);
-            } catch (Throwable ex) {
+            } catch (Exception ex) {
                 logger.error("fail to delete resource " + redirectPath + " in hdfs", ex);
             }
 
@@ -679,7 +681,7 @@ public class JDBCResourceDAO {
             if (redirectFileSystem.exists(oldPath)) {
                 redirectFileSystem.delete(oldPath, true);
             }
-        } catch (Throwable e) {
+        } catch (Exception e) {
             logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
         }
     }
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index ea6e231..a0a58cb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -23,7 +23,6 @@ import java.io.InputStream;
 import java.sql.SQLException;
 import java.util.List;
 import java.util.NavigableSet;
-import java.util.TreeSet;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.KylinConfig;
@@ -91,7 +90,7 @@ public class JDBCResourceStore extends ResourceStore {
     @Override
     protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException {
         try {
-            final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+            final NavigableSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
             return result.isEmpty() ? null : result;
         } catch (SQLException e) {
             throw new IOException(e);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 269833f..5f4d4be 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -90,9 +90,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         List<CubeSegmentScanner> scanners = Lists.newArrayList();
         SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
         for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
-            CubeSegmentScanner scanner;
-
-            scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
+            CubeSegmentScanner scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
                     request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
                     request.getMetrics(), request.getDynFuncs(), //
                     request.getFilter(), request.getHavingFilter(), request.getContext());
@@ -121,8 +119,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
-        Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
-        Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+        Set<TblColRef> dimensions = new LinkedHashSet<>();
+        Set<FunctionDesc> metrics = new LinkedHashSet<>();
         buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
 
         // all dimensions = groups + other(like filter) dimensions
@@ -136,13 +134,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         otherDimsD.removeAll(groupsD);
 
         // identify cuboid
-        Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+        Set<TblColRef> dimensionsD = new LinkedHashSet<>();
         dimensionsD.addAll(groupsD);
         dimensionsD.addAll(otherDimsD);
         Cuboid cuboid = findCuboid(cubeInstance, dimensionsD, metrics);
         context.setCuboid(cuboid);
 
-        // set cuboid to GridTable mapping;
+        // set cuboid to GridTable mapping
         boolean noDynamicCols;
         // dynamic dimensions
         List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet());
@@ -288,7 +286,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         } else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
             toCheck = filter.getChildren();
         } else {
-            return (Set<CompareTupleFilter>) Collections.EMPTY_SET;
+            return Collections.emptySet();
         }
 
         Set<CompareTupleFilter> result = Sets.newHashSet();
@@ -308,7 +306,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
     private long getQueryFilterMask(Set<TblColRef> filterColumnD) {
         long filterMask = 0;
 
-        logger.info("Filter column set for query: " + filterColumnD.toString());
+        logger.info("Filter column set for query: {0}", filterColumnD.toString());
         if (filterColumnD.size() != 0) {
             RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns();
             for (int i = 0; i < allColumns.length; i++) {
@@ -317,7 +315,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
                 }
             }
         }
-        logger.info("Filter mask is: " + filterMask);
+        logger.info("Filter mask is: {0}", filterMask);
         return filterMask;
     }
 
@@ -433,21 +431,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
             storageLimitLevel = StorageLimitLevel.LIMIT_ON_RETURN_SIZE;
             logger.debug(
-                    "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: "
-                            + groupsD //
-                            + " with cuboid columns: " + cuboid.getColumns());
+                    "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: {0} with cuboid columns: {1}", groupsD.toString(), cuboid.getColumns().toString());
         }
 
         if (!dynGroups.isEmpty()) {
             storageLimitLevel = StorageLimitLevel.NO_LIMIT;
-            logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups);
+            logger.debug("Storage limit push down is impossible because the query has dynamic groupby {0}", dynGroups);
         }
 
         // derived aggregation is bad, unless expanded columns are already in group by
         if (!groups.containsAll(derivedPostAggregation)) {
             storageLimitLevel = StorageLimitLevel.NO_LIMIT;
-            logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: "
-                    + derivedPostAggregation);
+            logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: {0}",
+                    derivedPostAggregation);
         }
 
         if (!TupleFilter.isEvaluableRecursively(filter)) {
@@ -457,7 +453,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
 
         if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
             storageLimitLevel = StorageLimitLevel.NO_LIMIT;
-            logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: " + loosenedColumnD);
+            logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: {0}", loosenedColumnD);
         }
 
         if (context.hasSort()) {
@@ -469,7 +465,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         for (FunctionDesc functionDesc : functionDescs) {
             if (functionDesc.isDimensionAsMetric()) {
                 storageLimitLevel = StorageLimitLevel.NO_LIMIT;
-                logger.debug("storageLimitLevel set to NO_LIMIT because {} isDimensionAsMetric ", functionDesc);
+                logger.debug("storageLimitLevel set to NO_LIMIT because {0} isDimensionAsMetric ", functionDesc);
             }
         }
 
@@ -488,8 +484,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
         if (!shardByInGroups.isEmpty()) {
             enabled = false;
-            logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: "
-                    + shardByInGroups);
+            logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: {0}",
+                     shardByInGroups);
         }
 
         if (!context.isNeedStorageAggregation()) {
@@ -536,7 +532,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             return null;
 
         // OK, push down
-        logger.info("Push down having filter " + havingFilter);
+        logger.info("Push down having filter {0}", havingFilter);
 
         // convert columns in the filter
         Set<TblColRef> aggrOutCols = new HashSet<>();
@@ -568,21 +564,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
         }
 
         if (cuboid.requirePostAggregation()) {
-            logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+            logger.info("exactAggregation is false because cuboid {0}=>{1}", cuboid.getInputID(), cuboid.getId());
             return false;
         }
 
         // derived aggregation is bad, unless expanded columns are already in group by
         if (!groups.containsAll(derivedPostAggregation)) {
-            logger.info("exactAggregation is false because derived column require post aggregation: "
-                    + derivedPostAggregation);
+            logger.info("exactAggregation is false because derived column require post aggregation: {0}",
+                    derivedPostAggregation);
             return false;
         }
 
         // other columns (from filter) is bad, unless they are ensured to have single value
         if (!singleValuesD.containsAll(othersD)) {
-            logger.info("exactAggregation is false because some column not on group by: " + othersD //
-                    + " (single value column: " + singleValuesD + ")");
+            logger.info("exactAggregation is false because some column not on group by: {0} (single value column: {1})", othersD, singleValuesD);
             return false;
         }
 
@@ -610,7 +605,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
             }
         }
 
-        logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
+        logger.info("exactAggregation is true, cuboid id is {0}", String.valueOf(cuboid.getId()));
         return true;
     }
 
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 7205802..ca8da25 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
 import org.apache.hadoop.io.NullWritable;
@@ -52,7 +52,6 @@ import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
 import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.storage.hbase.HBaseConnection;
 import org.slf4j.Logger;
@@ -108,7 +107,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             for (Long cuboid : buildingCuboids) {
                 Double cuboidSize = cuboidSizeMap.get(cuboid);
                 if (cuboidSize == null) {
-                    logger.warn(cuboid + "cuboid's size is null will replace by 0");
+                    logger.warn("{0} cuboid's size is null will replace by 0", cuboid);
                     cuboidSize = 0.0;
                 }
                 optimizedCuboidSizeMap.put(cuboid, cuboidSize);
@@ -128,7 +127,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         return 0;
     }
 
-    private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
+    private void exportHBaseConfiguration(String hbaseTableName) throws IOException {
 
         Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
         HadoopUtil.healSickConfig(hbaseConf);
@@ -136,14 +135,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
         HTable table = new HTable(hbaseConf, hbaseTableName);
         HFileOutputFormat2.configureIncrementalLoadMap(job, table);
 
-        logger.info("Saving HBase configuration to " + hbaseConfPath);
+        logger.info("Saving HBase configuration to {0}", hbaseConfPath);
         FileSystem fs = HadoopUtil.getWorkingFileSystem();
         FSDataOutputStream out = null;
         try {
             out = fs.create(new Path(hbaseConfPath));
             job.getConfiguration().writeXml(out);
-        } catch (IOException e) {
-            throw new ExecuteException("Write hbase configuration failed", e);
         } finally {
             IOUtils.closeQuietly(out);
         }
@@ -167,7 +164,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
         float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
 
-        logger.info("Cut for HBase region is " + cut + "GB");
+        logger.info("Cut for HBase region is {0} GB", String.valueOf(cut));
 
         double totalSizeInM = 0;
         for (Double cuboidSize : cubeSizeMap.values()) {
@@ -182,7 +179,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
         nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
         nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
 
-        if (cubeSegment.isEnableSharding()) {//&& (nRegion > 1)) {
+        if (cubeSegment.isEnableSharding()) {
             //use prime nRegions to help random sharding
             int original = nRegion;
             if (nRegion == 0) {
@@ -190,22 +187,22 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             if (nRegion > Short.MAX_VALUE) {
-                logger.info("Too many regions! reduce to " + Short.MAX_VALUE);
+                logger.info("Too many regions! reduce to {0}" + String.valueOf(Short.MAX_VALUE));
                 nRegion = Short.MAX_VALUE;
             }
 
             if (nRegion != original) {
                 logger.info(
-                        "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+                        "Region count is adjusted from {0} to {1} to help random sharding", String.valueOf(original), String.valueOf(nRegion));
             }
         }
 
         int mbPerRegion = (int) (totalSizeInM / nRegion);
         mbPerRegion = Math.max(1, mbPerRegion);
 
-        logger.info("Total size " + totalSizeInM + "M (estimated)");
-        logger.info("Expecting " + nRegion + " regions.");
-        logger.info("Expecting " + mbPerRegion + " MB per region.");
+        logger.info("Total size {0} M (estimated)", String.valueOf(totalSizeInM));
+        logger.info("Expecting {0} regions.", String.valueOf(nRegion));
+        logger.info("Expecting {0} MB per region.", String.valueOf(mbPerRegion));
 
         if (cubeSegment.isEnableSharding()) {
             //each cuboid will be split into different number of shards
@@ -247,42 +244,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
             }
 
             for (int i = 0; i < nRegion; ++i) {
-                logger.info(
-                        String.format(Locale.ROOT, "Region %d's estimated size is %.2f MB, accounting for %.2f percent",
-                                i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+                logger.info("Region {0}'s estimated size is {1} MB, accounting for {2} percent",
+                                String.valueOf(i), String.valueOf(regionSizes[i]), String.valueOf(100.0 * regionSizes[i] / totalSizeInM));
             }
 
             CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
             saveHFileSplits(innerRegionSplits, mbPerRegion, hfileSplitsOutputFolder, kylinConfig);
             return getSplitsByRegionCount(nRegion);
-
         } else {
-            List<Long> regionSplit = Lists.newArrayList();
-
-            long size = 0;
-            int regionIndex = 0;
-            int cuboidCount = 0;
-            for (int i = 0; i < allCuboids.size(); i++) {
-                long cuboidId = allCuboids.get(i);
-                if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
-                    // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
-                    regionSplit.add(cuboidId);
-                    logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId
-                            + " (" + cuboidCount + ") cuboids");
-                    size = 0;
-                    cuboidCount = 0;
-                    regionIndex++;
-                }
-                size += cubeSizeMap.get(cuboidId);
-                cuboidCount++;
-            }
-
-            byte[][] result = new byte[regionSplit.size()][];
-            for (int i = 0; i < regionSplit.size(); i++) {
-                result[i] = Bytes.toBytes(regionSplit.get(i));
-            }
-
-            return result;
+            throw new IllegalStateException("Not supported");
         }
     }
 
@@ -308,20 +278,20 @@ public class CreateHTableJob extends AbstractHadoopJob {
         }
 
         // keep the tweak for sandbox test
-        if (hfileSizeMB > 0.0 && kylinConfig.isDevEnv()) {
-            hfileSizeMB = mbPerRegion / 2;
+        if (hfileSizeMB > 0.0f && kylinConfig.isDevEnv()) {
+            hfileSizeMB = mbPerRegion / 2f;
         }
 
-        int compactionThreshold = Integer.valueOf(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
-        logger.info("hbase.hstore.compactionThreshold is " + compactionThreshold);
-        if (hfileSizeMB > 0.0 && hfileSizeMB * compactionThreshold < mbPerRegion) {
-            hfileSizeMB = mbPerRegion / compactionThreshold;
+        int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
+        logger.info("hbase.hstore.compactionThreshold is {0}", String.valueOf(compactionThreshold));
+        if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) {
+            hfileSizeMB = ((float) mbPerRegion) / compactionThreshold;
         }
 
-        if (hfileSizeMB <= 0) {
+        if (hfileSizeMB <= 0f) {
             hfileSizeMB = mbPerRegion;
         }
-        logger.info("hfileSizeMB:" + hfileSizeMB);
+        logger.info("hfileSizeMB {0}", String.valueOf(hfileSizeMB));
         final Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile");
         short regionCount = (short) innerRegionSplits.size();
 
@@ -331,7 +301,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
                 // skip 0
                 byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN];
                 BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
-                splits.add(split); // split by region;
+                splits.add(split);
             }
 
             HashMap<Long, Double> cuboidSize = innerRegionSplits.get(i);
@@ -344,8 +314,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
             for (Long cuboid : allCuboids) {
 
                 if (accumulatedSize >= hfileSizeMB) {
-                    logger.info(
-                            String.format(Locale.ROOT, "Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
+                    logger.info("Region {0}'s hfile {1} size is {2} mb", String.valueOf(i), String.valueOf(j), String.valueOf(accumulatedSize));
                     byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
                     BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
                     System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
@@ -359,17 +328,17 @@ public class CreateHTableJob extends AbstractHadoopJob {
 
         }
 
-        SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+        try (SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
                 SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
-                SequenceFile.Writer.valueClass(NullWritable.class));
+                SequenceFile.Writer.valueClass(NullWritable.class))) {
 
-        for (int i = 0; i < splits.size(); i++) {
-            //when we compare the rowkey, we compare the row firstly.
-            hfilePartitionWriter.append(
-                    new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
-                    NullWritable.get());
+            for (int i = 0; i < splits.size(); i++) {
+                //when we compare the rowkey, we compare the row firstly.
+                hfilePartitionWriter.append(
+                        new RowKeyWritable(KeyValueUtil.createFirstOnRow(splits.get(i), 9223372036854775807L).createKeyOnly(false).getKey()),
+                        NullWritable.get());
+            }
         }
-        hfilePartitionWriter.close();
     }
 
     public static void main(String[] args) throws Exception {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
deleted file mode 100644
index 47f4c58..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Locale;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.concurrent.Semaphore;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HbaseStreamingInput {
-    private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
-
-    private static final int CELL_SIZE = 128 * 1024; // 128 KB
-    private static final byte[] CF = "F".getBytes(StandardCharsets.UTF_8);
-    private static final byte[] QN = "C".getBytes(StandardCharsets.UTF_8);
-
-    public static void createTable(String tableName) throws IOException {
-        Connection conn = getConnection();
-        Admin hadmin = conn.getAdmin();
-
-        try {
-            boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName));
-            if (tableExist) {
-                logger.info("HTable '" + tableName + "' already exists");
-                return;
-            }
-
-            logger.info("Creating HTable '" + tableName + "'");
-            HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
-            desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
-            desc.setMemStoreFlushSize(512 << 20);//512M
-
-            HColumnDescriptor fd = new HColumnDescriptor(CF);
-            fd.setBlocksize(CELL_SIZE);
-            desc.addFamily(fd);
-            hadmin.createTable(desc);
-
-            logger.info("HTable '" + tableName + "' created");
-        } finally {
-            IOUtils.closeQuietly(conn);
-            IOUtils.closeQuietly(hadmin);
-        }
-    }
-
-    private static void scheduleJob(Semaphore semaphore, int interval) {
-        while (true) {
-            semaphore.release();
-            try {
-                Thread.sleep(interval);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-    }
-
-    public static void addData(String tableName) throws IOException {
-
-        createTable(tableName);
-
-        final Semaphore semaphore = new Semaphore(0);
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                scheduleJob(semaphore, 300000);//5 minutes a batch
-            }
-        }).start();
-
-        while (true) {
-            try {
-                semaphore.acquire();
-                int waiting = semaphore.availablePermits();
-                if (waiting > 0) {
-                    logger.warn("There are another " + waiting + " batches waiting to be added");
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                e.printStackTrace();
-            }
-
-            Connection conn = getConnection();
-            Table table = conn.getTable(TableName.valueOf(tableName));
-
-            byte[] key = new byte[8 + 4];//time + id
-
-            logger.info("============================================");
-            long startTime = System.currentTimeMillis();
-            logger.info("data load start time in millis: " + startTime);
-            logger.info("data load start at " + formatTime(startTime));
-            List<Put> buffer = Lists.newArrayList();
-            for (int i = 0; i < (1 << 10); ++i) {
-                long time = System.currentTimeMillis();
-                Bytes.putLong(key, 0, time);
-                Bytes.putInt(key, 8, i);
-                Put put = new Put(key);
-                byte[] cell = randomBytes(CELL_SIZE);
-                put.addColumn(CF, QN, cell);
-                buffer.add(put);
-            }
-            table.put(buffer);
-            table.close();
-            conn.close();
-            long endTime = System.currentTimeMillis();
-            logger.info("data load end at " + formatTime(endTime));
-            logger.info("data load time consumed: " + (endTime - startTime));
-            logger.info("============================================");
-        }
-    }
-
-    public static void randomScan(String tableName) throws IOException {
-
-        final Semaphore semaphore = new Semaphore(0);
-        new Thread(new Runnable() {
-            @Override
-            public void run() {
-                scheduleJob(semaphore, 60000);//1 minutes a batch
-            }
-        }).start();
-
-        while (true) {
-            try {
-                semaphore.acquire();
-                int waiting = semaphore.drainPermits();
-                if (waiting > 0) {
-                    logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
-                }
-            } catch (InterruptedException e) {
-                Thread.currentThread().interrupt();
-                e.printStackTrace();
-            }
-
-            Random r = new Random();
-            Connection conn = getConnection();
-            Table table = conn.getTable(TableName.valueOf(tableName));
-
-            long leftBound = getFirstKeyTime(table);
-            long rightBound = System.currentTimeMillis();
-
-            for (int t = 0; t < 5; ++t) {
-                long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
-                long end = start + 600000;//a period of 10 minutes
-                logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
-
-                Scan scan = new Scan();
-                scan.setStartRow(Bytes.toBytes(start));
-                scan.setStopRow(Bytes.toBytes(end));
-                scan.addFamily(CF);
-                ResultScanner scanner = table.getScanner(scan);
-                long hash = 0;
-                int rowCount = 0;
-                for (Result result : scanner) {
-                    Cell cell = result.getColumnLatestCell(CF, QN);
-                    byte[] value = cell.getValueArray();
-                    if (cell.getValueLength() != CELL_SIZE) {
-                        logger.error("value size invalid!!!!!");
-                    }
-
-                    hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(),
-                            cell.getValueLength() + cell.getValueOffset()));
-                    rowCount++;
-                }
-                scanner.close();
-                logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
-            }
-            table.close();
-            conn.close();
-        }
-    }
-
-    private static long getFirstKeyTime(Table table) throws IOException {
-        long startTime = 0;
-
-        Scan scan = new Scan();
-        scan.addFamily(CF);
-        ResultScanner scanner = table.getScanner(scan);
-        for (Result result : scanner) {
-            Cell cell = result.getColumnLatestCell(CF, QN);
-            byte[] key = cell.getRowArray();
-            startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
-            logger.info("Retrieved first record time: " + formatTime(startTime));
-            break;//only get first one
-        }
-        scanner.close();
-        return startTime;
-
-    }
-
-    private static Connection getConnection() throws IOException {
-        return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
-    }
-
-    private static String formatTime(long time) {
-        DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.ROOT);
-        Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT"), Locale.ROOT);
-        cal.setTimeInMillis(time);
-        return dateFormat.format(cal.getTime());
-    }
-
-    private static byte[] randomBytes(int lenth) {
-        byte[] bytes = new byte[lenth];
-        Random rand = new Random();
-        rand.nextBytes(bytes);
-        return bytes;
-    }
-
-    public static void main(String[] args) throws Exception {
-
-        if (args[0].equalsIgnoreCase("createtable")) {
-            createTable(args[1]);
-        } else if (args[0].equalsIgnoreCase("adddata")) {
-            addData(args[1]);
-        } else if (args[0].equalsIgnoreCase("randomscan")) {
-            randomScan(args[1]);
-        }
-    }
-
-}