You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/09/26 13:32:51 UTC
[kylin] branch master updated: KYLIN-3597 fix sonar issues
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 95f399e KYLIN-3597 fix sonar issues
95f399e is described below
commit 95f399e98957992e9a13fa1e71ee319e3018f246
Author: shaofengshi <sh...@apache.org>
AuthorDate: Wed Sep 26 17:21:31 2018 +0800
KYLIN-3597 fix sonar issues
---
.../common/persistence/HDFSResourceStore.java | 14 +-
.../common/persistence/JDBCConnectionManager.java | 16 +-
.../kylin/common/persistence/JDBCResourceDAO.java | 26 +-
.../common/persistence/JDBCResourceStore.java | 3 +-
.../storage/gtrecord/GTCubeStorageQueryBase.java | 49 ++--
.../kylin/storage/hbase/steps/CreateHTableJob.java | 95 +++-----
.../storage/hbase/util/HbaseStreamingInput.java | 262 ---------------------
7 files changed, 81 insertions(+), 384 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
index 1739ce0..e5bef40 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/HDFSResourceStore.java
@@ -65,18 +65,18 @@ public class HDFSResourceStore extends ResourceStore {
if (path == null) {
// missing path is not expected, but don't fail it
path = kylinConfig.getHdfsWorkingDirectory() + "tmp_metadata";
- logger.warn("Missing path, fall back to " + path);
+ logger.warn("Missing path, fall back to {0}", path);
}
fs = HadoopUtil.getFileSystem(path);
Path metadataPath = new Path(path);
if (fs.exists(metadataPath) == false) {
- logger.warn("Path not exist in HDFS, create it: " + path);
+ logger.warn("Path not exist in HDFS, create it: {0}", path);
createMetaFolder(metadataPath);
}
hdfsMetaPath = metadataPath;
- logger.info("hdfs meta path : " + hdfsMetaPath.toString());
+ logger.info("hdfs meta path : {0}", hdfsMetaPath.toString());
}
@@ -86,7 +86,7 @@ public class HDFSResourceStore extends ResourceStore {
fs.mkdirs(metaDirName);
}
- logger.info("hdfs meta path created: " + metaDirName.toString());
+ logger.info("hdfs meta path created: {0}", metaDirName.toString());
}
@Override
@@ -159,7 +159,7 @@ public class HDFSResourceStore extends ResourceStore {
Path p = getRealHDFSPath(resPath);
if (fs.exists(p) && fs.isFile(p)) {
if (fs.getFileStatus(p).getLen() == 0) {
- logger.warn("Zero length file: " + p.toString());
+ logger.warn("Zero length file: {0}", p.toString());
}
FSDataInputStream in = fs.open(p);
long t = in.readLong();
@@ -190,9 +190,9 @@ public class HDFSResourceStore extends ResourceStore {
@Override
protected void putResourceImpl(String resPath, InputStream content, long ts) throws IOException {
- logger.trace("res path : " + resPath);
+ logger.trace("res path : {0}", resPath);
Path p = getRealHDFSPath(resPath);
- logger.trace("put resource : " + p.toUri());
+ logger.trace("put resource : {0}", p.toUri());
FSDataOutputStream out = null;
try {
out = fs.create(p, true);
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
index 753601a..5f56de1 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCConnectionManager.java
@@ -42,15 +42,9 @@ public class JDBCConnectionManager {
private static JDBCConnectionManager INSTANCE = null;
- private static Object lock = new Object();
-
- public static JDBCConnectionManager getConnectionManager() {
+ public static synchronized JDBCConnectionManager getConnectionManager() {
if (INSTANCE == null) {
- synchronized (lock) {
- if (INSTANCE == null) {
- INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
- }
- }
+ INSTANCE = new JDBCConnectionManager(KylinConfig.getInstanceFromEnv());
}
return INSTANCE;
}
@@ -67,10 +61,10 @@ public class JDBCConnectionManager {
dataSource = BasicDataSourceFactory.createDataSource(getDbcpProperties());
Connection conn = getConn();
DatabaseMetaData mdm = conn.getMetaData();
- logger.info("Connected to " + mdm.getDatabaseProductName() + " " + mdm.getDatabaseProductVersion());
+ logger.info("Connected to {0} {1}", mdm.getDatabaseProductName(), mdm.getDatabaseProductVersion());
closeQuietly(conn);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new IllegalArgumentException(e);
}
}
@@ -94,7 +88,7 @@ public class JDBCConnectionManager {
ret.remove("passwordEncrypted");
}
- logger.info("Connecting to Jdbc with url:" + ret.get("url") + " by user " + ret.get("username"));
+ logger.info("Connecting to Jdbc with url:{0} by user {1}", ret.get("url"), ret.get("username"));
putIfMissing(ret, "driverClassName", "com.mysql.jdbc.Driver");
putIfMissing(ret, "maxActive", "5");
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
index a226af6..dce0894 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceDAO.java
@@ -32,8 +32,10 @@ import java.text.FieldPosition;
import java.text.MessageFormat;
import java.util.List;
import java.util.Locale;
+import java.util.NavigableSet;
import java.util.TreeSet;
+import com.google.common.base.Preconditions;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -116,7 +118,7 @@ public class JDBCResourceDAO {
if (fetchContent) {
try {
resource.setContent(getInputStream(resourcePath, rs));
- } catch (Throwable e) {
+ } catch (Exception e) {
if (!isAllowBroken) {
throw new SQLException(e);
}
@@ -147,8 +149,8 @@ public class JDBCResourceDAO {
}
//fetch primary key only
- public TreeSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
- final TreeSet<String> allResourceName = new TreeSet<>();
+ public NavigableSet<String> listAllResource(final String folderPath, final boolean recursive) throws SQLException {
+ final NavigableSet<String> allResourceName = new TreeSet<>();
executeSql(new SqlOperation() {
@Override
public void execute(Connection connection) throws SQLException {
@@ -158,7 +160,7 @@ public class JDBCResourceDAO {
rs = pstat.executeQuery();
while (rs.next()) {
String path = rs.getString(META_TABLE_KEY);
- assert path.startsWith(folderPath);
+ Preconditions.checkState(path.startsWith(folderPath));
if (recursive) {
allResourceName.add(path);
} else {
@@ -192,7 +194,7 @@ public class JDBCResourceDAO {
resource.setTimestamp(rs.getLong(META_TABLE_TS));
try {
resource.setContent(getInputStream(resPath, rs));
- } catch (Throwable e) {
+ } catch (Exception e) {
if (!isAllowBroken) {
throw new SQLException(e);
}
@@ -240,7 +242,7 @@ public class JDBCResourceDAO {
if (!skipHdfs) {
try {
deleteHDFSResourceIfExist(resourcePath);
- } catch (Throwable e) {
+ } catch (Exception e) {
throw new SQLException(e);
}
}
@@ -389,7 +391,7 @@ public class JDBCResourceDAO {
bout = new ByteArrayOutputStream();
IOUtils.copy(resource.getContent(), bout);
return bout.toByteArray();
- } catch (Throwable e) {
+ } catch (Exception e) {
throw new SQLException(e);
} finally {
IOUtils.closeQuietly(bout);
@@ -635,10 +637,10 @@ public class JDBCResourceDAO {
out = redirectFileSystem.create(redirectPath);
out.write(largeColumn);
return redirectPath;
- } catch (Throwable e) {
+ } catch (Exception e) {
try {
rollbackLargeCellFromHdfs(resPath);
- } catch (Throwable ex) {
+ } catch (Exception ex) {
logger.error("fail to roll back resource " + resPath + " in hdfs", ex);
}
throw new SQLException(e);
@@ -659,12 +661,12 @@ public class JDBCResourceDAO {
redirectFileSystem.delete(redirectPath, true);
logger.warn("no backup for hdfs file {} is found, clean it", resPath);
}
- } catch (Throwable e) {
+ } catch (Exception e) {
try {
//last try to delete redirectPath, because we prefer a deleted rather than incomplete
redirectFileSystem.delete(redirectPath, true);
- } catch (Throwable ex) {
+ } catch (Exception ex) {
logger.error("fail to delete resource " + redirectPath + " in hdfs", ex);
}
@@ -679,7 +681,7 @@ public class JDBCResourceDAO {
if (redirectFileSystem.exists(oldPath)) {
redirectFileSystem.delete(oldPath, true);
}
- } catch (Throwable e) {
+ } catch (Exception e) {
logger.warn("error cleaning the backup file for " + redirectPath + ", leave it as garbage", e);
}
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
index ea6e231..a0a58cb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/JDBCResourceStore.java
@@ -23,7 +23,6 @@ import java.io.InputStream;
import java.sql.SQLException;
import java.util.List;
import java.util.NavigableSet;
-import java.util.TreeSet;
import org.apache.commons.io.IOUtils;
import org.apache.kylin.common.KylinConfig;
@@ -91,7 +90,7 @@ public class JDBCResourceStore extends ResourceStore {
@Override
protected NavigableSet<String> listResourcesImpl(String folderPath, boolean recursive) throws IOException {
try {
- final TreeSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
+ final NavigableSet<String> result = resourceDAO.listAllResource(makeFolderPath(folderPath), recursive);
return result.isEmpty() ? null : result;
} catch (SQLException e) {
throw new IOException(e);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
index 269833f..5f4d4be 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java
@@ -90,9 +90,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
List<CubeSegmentScanner> scanners = Lists.newArrayList();
SegmentPruner segPruner = new SegmentPruner(sqlDigest.filter);
for (CubeSegment cubeSeg : segPruner.listSegmentsForQuery(cubeInstance)) {
- CubeSegmentScanner scanner;
-
- scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
+ CubeSegmentScanner scanner = new CubeSegmentScanner(cubeSeg, request.getCuboid(), request.getDimensions(), //
request.getGroups(), request.getDynGroups(), request.getDynGroupExprs(), //
request.getMetrics(), request.getDynFuncs(), //
request.getFilter(), request.getHavingFilter(), request.getContext());
@@ -121,8 +119,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
TupleFilter filter = sqlDigest.filter;
// build dimension & metrics
- Set<TblColRef> dimensions = new LinkedHashSet<TblColRef>();
- Set<FunctionDesc> metrics = new LinkedHashSet<FunctionDesc>();
+ Set<TblColRef> dimensions = new LinkedHashSet<>();
+ Set<FunctionDesc> metrics = new LinkedHashSet<>();
buildDimensionsAndMetrics(sqlDigest, dimensions, metrics);
// all dimensions = groups + other(like filter) dimensions
@@ -136,13 +134,13 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
otherDimsD.removeAll(groupsD);
// identify cuboid
- Set<TblColRef> dimensionsD = new LinkedHashSet<TblColRef>();
+ Set<TblColRef> dimensionsD = new LinkedHashSet<>();
dimensionsD.addAll(groupsD);
dimensionsD.addAll(otherDimsD);
Cuboid cuboid = findCuboid(cubeInstance, dimensionsD, metrics);
context.setCuboid(cuboid);
- // set cuboid to GridTable mapping;
+ // set cuboid to GridTable mapping
boolean noDynamicCols;
// dynamic dimensions
List<TblColRef> dynGroups = Lists.newArrayList(sqlDigest.dynGroupbyColumns.keySet());
@@ -288,7 +286,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
} else if (filter instanceof LogicalTupleFilter && filter.getOperator() == FilterOperatorEnum.AND) {
toCheck = filter.getChildren();
} else {
- return (Set<CompareTupleFilter>) Collections.EMPTY_SET;
+ return Collections.emptySet();
}
Set<CompareTupleFilter> result = Sets.newHashSet();
@@ -308,7 +306,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
private long getQueryFilterMask(Set<TblColRef> filterColumnD) {
long filterMask = 0;
- logger.info("Filter column set for query: " + filterColumnD.toString());
+ logger.info("Filter column set for query: {0}", filterColumnD.toString());
if (filterColumnD.size() != 0) {
RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns();
for (int i = 0; i < allColumns.length; i++) {
@@ -317,7 +315,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
}
- logger.info("Filter mask is: " + filterMask);
+ logger.info("Filter mask is: {0}", filterMask);
return filterMask;
}
@@ -433,21 +431,19 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
if (!groupsD.containsAll(cuboid.getColumns().subList(0, size))) {
storageLimitLevel = StorageLimitLevel.LIMIT_ON_RETURN_SIZE;
logger.debug(
- "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: "
- + groupsD //
- + " with cuboid columns: " + cuboid.getColumns());
+ "storageLimitLevel set to LIMIT_ON_RETURN_SIZE because groupD is not clustered at head, groupsD: {0} with cuboid columns: {1}", groupsD.toString(), cuboid.getColumns().toString());
}
if (!dynGroups.isEmpty()) {
storageLimitLevel = StorageLimitLevel.NO_LIMIT;
- logger.debug("Storage limit push down is impossible because the query has dynamic groupby " + dynGroups);
+ logger.debug("Storage limit push down is impossible because the query has dynamic groupby {0}", dynGroups);
}
// derived aggregation is bad, unless expanded columns are already in group by
if (!groups.containsAll(derivedPostAggregation)) {
storageLimitLevel = StorageLimitLevel.NO_LIMIT;
- logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: "
- + derivedPostAggregation);
+ logger.debug("storageLimitLevel set to NO_LIMIT because derived column require post aggregation: {0}",
+ derivedPostAggregation);
}
if (!TupleFilter.isEvaluableRecursively(filter)) {
@@ -457,7 +453,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
if (!loosenedColumnD.isEmpty()) { // KYLIN-2173
storageLimitLevel = StorageLimitLevel.NO_LIMIT;
- logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: " + loosenedColumnD);
+ logger.debug("storageLimitLevel set to NO_LIMIT because filter is loosened: {0}", loosenedColumnD);
}
if (context.hasSort()) {
@@ -469,7 +465,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
for (FunctionDesc functionDesc : functionDescs) {
if (functionDesc.isDimensionAsMetric()) {
storageLimitLevel = StorageLimitLevel.NO_LIMIT;
- logger.debug("storageLimitLevel set to NO_LIMIT because {} isDimensionAsMetric ", functionDesc);
+ logger.debug("storageLimitLevel set to NO_LIMIT because {0} isDimensionAsMetric ", functionDesc);
}
}
@@ -488,8 +484,8 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (!shardByInGroups.isEmpty()) {
enabled = false;
- logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: "
- + shardByInGroups);
+ logger.debug("Aggregate partition results is not beneficial because shard by columns in groupD: {0}",
+ shardByInGroups);
}
if (!context.isNeedStorageAggregation()) {
@@ -536,7 +532,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
return null;
// OK, push down
- logger.info("Push down having filter " + havingFilter);
+ logger.info("Push down having filter {0}", havingFilter);
// convert columns in the filter
Set<TblColRef> aggrOutCols = new HashSet<>();
@@ -568,21 +564,20 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
if (cuboid.requirePostAggregation()) {
- logger.info("exactAggregation is false because cuboid " + cuboid.getInputID() + "=> " + cuboid.getId());
+ logger.info("exactAggregation is false because cuboid {0}=>{1}", cuboid.getInputID(), cuboid.getId());
return false;
}
// derived aggregation is bad, unless expanded columns are already in group by
if (!groups.containsAll(derivedPostAggregation)) {
- logger.info("exactAggregation is false because derived column require post aggregation: "
- + derivedPostAggregation);
+ logger.info("exactAggregation is false because derived column require post aggregation: {0}",
+ derivedPostAggregation);
return false;
}
// other columns (from filter) is bad, unless they are ensured to have single value
if (!singleValuesD.containsAll(othersD)) {
- logger.info("exactAggregation is false because some column not on group by: " + othersD //
- + " (single value column: " + singleValuesD + ")");
+ logger.info("exactAggregation is false because some column not on group by: {0} (single value column: {1})", othersD, singleValuesD);
return false;
}
@@ -610,7 +605,7 @@ public abstract class GTCubeStorageQueryBase implements IStorageQuery {
}
}
- logger.info("exactAggregation is true, cuboid id is " + cuboid.getId());
+ logger.info("exactAggregation is true, cuboid id is {0}", String.valueOf(cuboid.getId()));
return true;
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
index 7205802..ca8da25 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/CreateHTableJob.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2;
import org.apache.hadoop.io.NullWritable;
@@ -52,7 +52,6 @@ import org.apache.kylin.cube.model.CubeDesc;
import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
import org.apache.kylin.engine.mr.common.CubeStatsReader;
import org.apache.kylin.engine.mr.common.CuboidShardUtil;
-import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.metadata.model.IEngineAware;
import org.apache.kylin.storage.hbase.HBaseConnection;
import org.slf4j.Logger;
@@ -108,7 +107,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
for (Long cuboid : buildingCuboids) {
Double cuboidSize = cuboidSizeMap.get(cuboid);
if (cuboidSize == null) {
- logger.warn(cuboid + "cuboid's size is null will replace by 0");
+ logger.warn("{0} cuboid's size is null will replace by 0", cuboid);
cuboidSize = 0.0;
}
optimizedCuboidSizeMap.put(cuboid, cuboidSize);
@@ -128,7 +127,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
return 0;
}
- private void exportHBaseConfiguration(String hbaseTableName) throws Exception {
+ private void exportHBaseConfiguration(String hbaseTableName) throws IOException {
Configuration hbaseConf = HBaseConnection.getCurrentHBaseConfiguration();
HadoopUtil.healSickConfig(hbaseConf);
@@ -136,14 +135,12 @@ public class CreateHTableJob extends AbstractHadoopJob {
HTable table = new HTable(hbaseConf, hbaseTableName);
HFileOutputFormat2.configureIncrementalLoadMap(job, table);
- logger.info("Saving HBase configuration to " + hbaseConfPath);
+ logger.info("Saving HBase configuration to {0}", hbaseConfPath);
FileSystem fs = HadoopUtil.getWorkingFileSystem();
FSDataOutputStream out = null;
try {
out = fs.create(new Path(hbaseConfPath));
job.getConfiguration().writeXml(out);
- } catch (IOException e) {
- throw new ExecuteException("Write hbase configuration failed", e);
} finally {
IOUtils.closeQuietly(out);
}
@@ -167,7 +164,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
final CubeDesc cubeDesc = cubeSegment.getCubeDesc();
float cut = cubeDesc.getConfig().getKylinHBaseRegionCut();
- logger.info("Cut for HBase region is " + cut + "GB");
+ logger.info("Cut for HBase region is {0} GB", String.valueOf(cut));
double totalSizeInM = 0;
for (Double cuboidSize : cubeSizeMap.values()) {
@@ -182,7 +179,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion);
nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion);
- if (cubeSegment.isEnableSharding()) {//&& (nRegion > 1)) {
+ if (cubeSegment.isEnableSharding()) {
//use prime nRegions to help random sharding
int original = nRegion;
if (nRegion == 0) {
@@ -190,22 +187,22 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
if (nRegion > Short.MAX_VALUE) {
- logger.info("Too many regions! reduce to " + Short.MAX_VALUE);
+ logger.info("Too many regions! reduce to {0}" + String.valueOf(Short.MAX_VALUE));
nRegion = Short.MAX_VALUE;
}
if (nRegion != original) {
logger.info(
- "Region count is adjusted from " + original + " to " + nRegion + " to help random sharding");
+ "Region count is adjusted from {0} to {1} to help random sharding", String.valueOf(original), String.valueOf(nRegion));
}
}
int mbPerRegion = (int) (totalSizeInM / nRegion);
mbPerRegion = Math.max(1, mbPerRegion);
- logger.info("Total size " + totalSizeInM + "M (estimated)");
- logger.info("Expecting " + nRegion + " regions.");
- logger.info("Expecting " + mbPerRegion + " MB per region.");
+ logger.info("Total size {0} M (estimated)", String.valueOf(totalSizeInM));
+ logger.info("Expecting {0} regions.", String.valueOf(nRegion));
+ logger.info("Expecting {0} MB per region.", String.valueOf(mbPerRegion));
if (cubeSegment.isEnableSharding()) {
//each cuboid will be split into different number of shards
@@ -247,42 +244,15 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
for (int i = 0; i < nRegion; ++i) {
- logger.info(
- String.format(Locale.ROOT, "Region %d's estimated size is %.2f MB, accounting for %.2f percent",
- i, regionSizes[i], 100.0 * regionSizes[i] / totalSizeInM));
+ logger.info("Region {0}'s estimated size is {1} MB, accounting for {2} percent",
+ String.valueOf(i), String.valueOf(regionSizes[i]), String.valueOf(100.0 * regionSizes[i] / totalSizeInM));
}
CuboidShardUtil.saveCuboidShards(cubeSegment, cuboidShards, nRegion);
saveHFileSplits(innerRegionSplits, mbPerRegion, hfileSplitsOutputFolder, kylinConfig);
return getSplitsByRegionCount(nRegion);
-
} else {
- List<Long> regionSplit = Lists.newArrayList();
-
- long size = 0;
- int regionIndex = 0;
- int cuboidCount = 0;
- for (int i = 0; i < allCuboids.size(); i++) {
- long cuboidId = allCuboids.get(i);
- if (size >= mbPerRegion || (size + cubeSizeMap.get(cuboidId)) >= mbPerRegion * 1.2) {
- // if the size already bigger than threshold, or it will exceed by 20%, cut for next region
- regionSplit.add(cuboidId);
- logger.info("Region " + regionIndex + " will be " + size + " MB, contains cuboids < " + cuboidId
- + " (" + cuboidCount + ") cuboids");
- size = 0;
- cuboidCount = 0;
- regionIndex++;
- }
- size += cubeSizeMap.get(cuboidId);
- cuboidCount++;
- }
-
- byte[][] result = new byte[regionSplit.size()][];
- for (int i = 0; i < regionSplit.size(); i++) {
- result[i] = Bytes.toBytes(regionSplit.get(i));
- }
-
- return result;
+ throw new IllegalStateException("Not supported");
}
}
@@ -308,20 +278,20 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
// keep the tweak for sandbox test
- if (hfileSizeMB > 0.0 && kylinConfig.isDevEnv()) {
- hfileSizeMB = mbPerRegion / 2;
+ if (hfileSizeMB > 0.0f && kylinConfig.isDevEnv()) {
+ hfileSizeMB = mbPerRegion / 2f;
}
- int compactionThreshold = Integer.valueOf(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
- logger.info("hbase.hstore.compactionThreshold is " + compactionThreshold);
- if (hfileSizeMB > 0.0 && hfileSizeMB * compactionThreshold < mbPerRegion) {
- hfileSizeMB = mbPerRegion / compactionThreshold;
+ int compactionThreshold = Integer.parseInt(hbaseConf.get("hbase.hstore.compactionThreshold", "3"));
+ logger.info("hbase.hstore.compactionThreshold is {0}", String.valueOf(compactionThreshold));
+ if (hfileSizeMB > 0.0f && hfileSizeMB * compactionThreshold < mbPerRegion) {
+ hfileSizeMB = ((float) mbPerRegion) / compactionThreshold;
}
- if (hfileSizeMB <= 0) {
+ if (hfileSizeMB <= 0f) {
hfileSizeMB = mbPerRegion;
}
- logger.info("hfileSizeMB:" + hfileSizeMB);
+ logger.info("hfileSizeMB {0}", String.valueOf(hfileSizeMB));
final Path hfilePartitionFile = new Path(outputFolder, "part-r-00000_hfile");
short regionCount = (short) innerRegionSplits.size();
@@ -331,7 +301,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
// skip 0
byte[] split = new byte[RowConstants.ROWKEY_SHARDID_LEN];
BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
- splits.add(split); // split by region;
+ splits.add(split);
}
HashMap<Long, Double> cuboidSize = innerRegionSplits.get(i);
@@ -344,8 +314,7 @@ public class CreateHTableJob extends AbstractHadoopJob {
for (Long cuboid : allCuboids) {
if (accumulatedSize >= hfileSizeMB) {
- logger.info(
- String.format(Locale.ROOT, "Region %d's hfile %d size is %.2f mb", i, j, accumulatedSize));
+ logger.info("Region {0}'s hfile {1} size is {2} mb", String.valueOf(i), String.valueOf(j), String.valueOf(accumulatedSize));
byte[] split = new byte[RowConstants.ROWKEY_SHARD_AND_CUBOID_LEN];
BytesUtil.writeUnsigned(i, split, 0, RowConstants.ROWKEY_SHARDID_LEN);
System.arraycopy(Bytes.toBytes(cuboid), 0, split, RowConstants.ROWKEY_SHARDID_LEN,
@@ -359,17 +328,17 @@ public class CreateHTableJob extends AbstractHadoopJob {
}
- SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
+ try (SequenceFile.Writer hfilePartitionWriter = SequenceFile.createWriter(hbaseConf,
SequenceFile.Writer.file(hfilePartitionFile), SequenceFile.Writer.keyClass(RowKeyWritable.class),
- SequenceFile.Writer.valueClass(NullWritable.class));
+ SequenceFile.Writer.valueClass(NullWritable.class))) {
- for (int i = 0; i < splits.size(); i++) {
- //when we compare the rowkey, we compare the row firstly.
- hfilePartitionWriter.append(
- new RowKeyWritable(KeyValue.createFirstOnRow(splits.get(i)).createKeyOnly(false).getKey()),
- NullWritable.get());
+ for (int i = 0; i < splits.size(); i++) {
+ //when we compare the rowkey, we compare the row firstly.
+ hfilePartitionWriter.append(
+ new RowKeyWritable(KeyValueUtil.createFirstOnRow(splits.get(i), 9223372036854775807L).createKeyOnly(false).getKey()),
+ NullWritable.get());
+ }
}
- hfilePartitionWriter.close();
}
public static void main(String[] args) throws Exception {
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
deleted file mode 100644
index 47f4c58..0000000
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/HbaseStreamingInput.java
+++ /dev/null
@@ -1,262 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kylin.storage.hbase.util;
-
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.List;
-import java.util.Locale;
-import java.util.Random;
-import java.util.TimeZone;
-import java.util.concurrent.Semaphore;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.storage.hbase.HBaseConnection;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
-
-/**
- */
-public class HbaseStreamingInput {
- private static final Logger logger = LoggerFactory.getLogger(HbaseStreamingInput.class);
-
- private static final int CELL_SIZE = 128 * 1024; // 128 KB
- private static final byte[] CF = "F".getBytes(StandardCharsets.UTF_8);
- private static final byte[] QN = "C".getBytes(StandardCharsets.UTF_8);
-
- public static void createTable(String tableName) throws IOException {
- Connection conn = getConnection();
- Admin hadmin = conn.getAdmin();
-
- try {
- boolean tableExist = hadmin.tableExists(TableName.valueOf(tableName));
- if (tableExist) {
- logger.info("HTable '" + tableName + "' already exists");
- return;
- }
-
- logger.info("Creating HTable '" + tableName + "'");
- HTableDescriptor desc = new HTableDescriptor(TableName.valueOf(tableName));
- desc.setValue(HTableDescriptor.SPLIT_POLICY, DisabledRegionSplitPolicy.class.getName());//disable region split
- desc.setMemStoreFlushSize(512 << 20);//512M
-
- HColumnDescriptor fd = new HColumnDescriptor(CF);
- fd.setBlocksize(CELL_SIZE);
- desc.addFamily(fd);
- hadmin.createTable(desc);
-
- logger.info("HTable '" + tableName + "' created");
- } finally {
- IOUtils.closeQuietly(conn);
- IOUtils.closeQuietly(hadmin);
- }
- }
-
- private static void scheduleJob(Semaphore semaphore, int interval) {
- while (true) {
- semaphore.release();
- try {
- Thread.sleep(interval);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void addData(String tableName) throws IOException {
-
- createTable(tableName);
-
- final Semaphore semaphore = new Semaphore(0);
- new Thread(new Runnable() {
- @Override
- public void run() {
- scheduleJob(semaphore, 300000);//5 minutes a batch
- }
- }).start();
-
- while (true) {
- try {
- semaphore.acquire();
- int waiting = semaphore.availablePermits();
- if (waiting > 0) {
- logger.warn("There are another " + waiting + " batches waiting to be added");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
- }
-
- Connection conn = getConnection();
- Table table = conn.getTable(TableName.valueOf(tableName));
-
- byte[] key = new byte[8 + 4];//time + id
-
- logger.info("============================================");
- long startTime = System.currentTimeMillis();
- logger.info("data load start time in millis: " + startTime);
- logger.info("data load start at " + formatTime(startTime));
- List<Put> buffer = Lists.newArrayList();
- for (int i = 0; i < (1 << 10); ++i) {
- long time = System.currentTimeMillis();
- Bytes.putLong(key, 0, time);
- Bytes.putInt(key, 8, i);
- Put put = new Put(key);
- byte[] cell = randomBytes(CELL_SIZE);
- put.addColumn(CF, QN, cell);
- buffer.add(put);
- }
- table.put(buffer);
- table.close();
- conn.close();
- long endTime = System.currentTimeMillis();
- logger.info("data load end at " + formatTime(endTime));
- logger.info("data load time consumed: " + (endTime - startTime));
- logger.info("============================================");
- }
- }
-
- public static void randomScan(String tableName) throws IOException {
-
- final Semaphore semaphore = new Semaphore(0);
- new Thread(new Runnable() {
- @Override
- public void run() {
- scheduleJob(semaphore, 60000);//1 minutes a batch
- }
- }).start();
-
- while (true) {
- try {
- semaphore.acquire();
- int waiting = semaphore.drainPermits();
- if (waiting > 0) {
- logger.warn("Too many queries to handle! Blocking " + waiting + " sets of scan requests");
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- e.printStackTrace();
- }
-
- Random r = new Random();
- Connection conn = getConnection();
- Table table = conn.getTable(TableName.valueOf(tableName));
-
- long leftBound = getFirstKeyTime(table);
- long rightBound = System.currentTimeMillis();
-
- for (int t = 0; t < 5; ++t) {
- long start = (long) (leftBound + r.nextDouble() * (rightBound - leftBound));
- long end = start + 600000;//a period of 10 minutes
- logger.info("A scan from " + formatTime(start) + " to " + formatTime(end));
-
- Scan scan = new Scan();
- scan.setStartRow(Bytes.toBytes(start));
- scan.setStopRow(Bytes.toBytes(end));
- scan.addFamily(CF);
- ResultScanner scanner = table.getScanner(scan);
- long hash = 0;
- int rowCount = 0;
- for (Result result : scanner) {
- Cell cell = result.getColumnLatestCell(CF, QN);
- byte[] value = cell.getValueArray();
- if (cell.getValueLength() != CELL_SIZE) {
- logger.error("value size invalid!!!!!");
- }
-
- hash += Arrays.hashCode(Arrays.copyOfRange(value, cell.getValueOffset(),
- cell.getValueLength() + cell.getValueOffset()));
- rowCount++;
- }
- scanner.close();
- logger.info("Scanned " + rowCount + " rows, the (meaningless) hash for the scan is " + hash);
- }
- table.close();
- conn.close();
- }
- }
-
- private static long getFirstKeyTime(Table table) throws IOException {
- long startTime = 0;
-
- Scan scan = new Scan();
- scan.addFamily(CF);
- ResultScanner scanner = table.getScanner(scan);
- for (Result result : scanner) {
- Cell cell = result.getColumnLatestCell(CF, QN);
- byte[] key = cell.getRowArray();
- startTime = Bytes.toLong(key, cell.getRowOffset(), 8);
- logger.info("Retrieved first record time: " + formatTime(startTime));
- break;//only get first one
- }
- scanner.close();
- return startTime;
-
- }
-
- private static Connection getConnection() throws IOException {
- return HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl());
- }
-
- private static String formatTime(long time) {
- DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss", Locale.ROOT);
- Calendar cal = Calendar.getInstance(TimeZone.getTimeZone("GMT"), Locale.ROOT);
- cal.setTimeInMillis(time);
- return dateFormat.format(cal.getTime());
- }
-
- private static byte[] randomBytes(int lenth) {
- byte[] bytes = new byte[lenth];
- Random rand = new Random();
- rand.nextBytes(bytes);
- return bytes;
- }
-
- public static void main(String[] args) throws Exception {
-
- if (args[0].equalsIgnoreCase("createtable")) {
- createTable(args[1]);
- } else if (args[0].equalsIgnoreCase("adddata")) {
- addData(args[1]);
- } else if (args[0].equalsIgnoreCase("randomscan")) {
- randomScan(args[1]);
- }
- }
-
-}