You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/02/20 16:21:05 UTC
[kylin] 02/06: minor, Math operands should be cast before assignment
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 8c78e7c96d095bc30b8fd7ec0745f7a49cf8658e
Author: etherge <et...@163.com>
AuthorDate: Fri Feb 14 18:27:18 2020 -0500
minor, Math operands should be cast before assignment
---
.../persistence/ExponentialBackoffRetry.java | 2 +-
.../cube/cuboid/algorithm/BPUSCalculator.java | 2 +-
.../cube/cuboid/algorithm/CuboidStatsUtil.java | 4 +-
.../dict/lookup/cache/RocksDBLookupTableCache.java | 46 +++++++++-------
.../kylin/metrics/lib/impl/BlockingReservoir.java | 9 ++--
.../org/apache/kylin/storage/StorageContext.java | 2 +-
.../kylin/engine/mr/streaming/ColumnToRowJob.java | 2 +-
.../apache/kylin/query/relnode/OLAPProjectRel.java | 2 +-
.../kylin/rest/controller/QueryController.java | 2 +-
.../apache/kylin/rest/job/KylinHealthCheckJob.java | 15 +++---
.../java/org/apache/kylin/source/jdbc/SqlUtil.java | 3 +-
.../kylin/storage/hbase/util/CubeMigrationCLI.java | 61 +++++++++++++++-------
.../kylin/stream/coordinator/Coordinator.java | 38 +++++++-------
.../coordinate/StreamingCoordinator.java | 3 +-
.../storage/columnar/ColumnarSegmentStore.java | 2 +-
.../columnar/FSInputGeneralColumnDataReader.java | 2 +-
.../stream/server/ReplicaSetLeaderSelector.java | 2 +-
.../org/apache/kylin/tool/CubeMigrationCLI.java | 6 +--
18 files changed, 119 insertions(+), 84 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java b/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java
index 315c51e..d06337b 100644
--- a/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java
+++ b/core-common/src/main/java/org/apache/kylin/common/persistence/ExponentialBackoffRetry.java
@@ -101,7 +101,7 @@ public class ExponentialBackoffRetry {
if (retryCount == 0)
firstSleepTime = System.currentTimeMillis();
- long ms = baseSleepTimeMs * (1 << retryCount);
+ long ms = baseSleepTimeMs * (1L << retryCount);
if (ms > maxSleepTimeMs)
ms = maxSleepTimeMs;
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
index 39c52da..3041af9 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/BPUSCalculator.java
@@ -120,7 +120,7 @@ public class BPUSCalculator implements BenefitPolicy {
protected double getCostSaving(long descendant, long cuboid) {
long cuboidCost = getCuboidCost(cuboid);
long descendantAggCost = getCuboidAggregationCost(descendant);
- return descendantAggCost - cuboidCost;
+ return (double) descendantAggCost - cuboidCost;
}
protected Long getCuboidCost(long cuboid) {
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
index d9aaf54..1f9914b 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/cuboid/algorithm/CuboidStatsUtil.java
@@ -125,7 +125,7 @@ public class CuboidStatsUtil {
nEffective++;
}
}
-
+
if (nEffective != 0)
srcCuboidsStats.put(cuboid, totalEstRowCount / nEffective);
else
@@ -349,7 +349,7 @@ public class CuboidStatsUtil {
}
private static double calculateRollupRatio(Pair<Long, Long> rollupStats) {
- double rollupInputCount = rollupStats.getFirst() + rollupStats.getSecond();
+ double rollupInputCount = (double) rollupStats.getFirst() + rollupStats.getSecond();
return rollupInputCount == 0 ? 0 : 1.0 * rollupStats.getFirst() / rollupInputCount;
}
}
diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
index c748e18..bbcaaf3 100644
--- a/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
+++ b/core-dictionary/src/main/java/org/apache/kylin/dict/lookup/cache/RocksDBLookupTableCache.java
@@ -200,13 +200,14 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
private void initExecutors() {
this.cacheBuildExecutor = new ThreadPoolExecutor(0, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(), new NamedThreadFactory("lookup-cache-build-thread"));
- this.cacheStateCheckExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory(
- "lookup-cache-state-checker"));
- cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60, TimeUnit.SECONDS); // check every 10 minutes
+ this.cacheStateCheckExecutor = Executors
+ .newSingleThreadScheduledExecutor(new NamedThreadFactory("lookup-cache-state-checker"));
+ cacheStateCheckExecutor.scheduleAtFixedRate(cacheStateChecker, 10, 10 * 60L, TimeUnit.SECONDS); // check every 10 minutes
}
@Override
- public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo, boolean buildIfNotExist) {
+ public ILookupTable getCachedLookupTable(TableDesc tableDesc, ExtTableSnapshotInfo extTableSnapshotInfo,
+ boolean buildIfNotExist) {
String resourcePath = extTableSnapshotInfo.getResourcePath();
if (inBuildingTables.containsKey(resourcePath)) {
logger.info("cache is in building for snapshot:" + resourcePath);
@@ -215,7 +216,8 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
CachedTableInfo cachedTableInfo = tablesCache.getIfPresent(resourcePath);
if (cachedTableInfo == null) {
if (buildIfNotExist) {
- buildSnapshotCache(tableDesc, extTableSnapshotInfo, getSourceLookupTable(tableDesc, extTableSnapshotInfo));
+ buildSnapshotCache(tableDesc, extTableSnapshotInfo,
+ getSourceLookupTable(tableDesc, extTableSnapshotInfo));
}
logger.info("no available cache ready for the table snapshot:" + extTableSnapshotInfo.getResourcePath());
return null;
@@ -231,14 +233,16 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
}
@Override
- public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo, final ILookupTable sourceTable) {
+ public void buildSnapshotCache(final TableDesc tableDesc, final ExtTableSnapshotInfo extTableSnapshotInfo,
+ final ILookupTable sourceTable) {
if (extTableSnapshotInfo.getSignature().getSize() / 1024 > maxCacheSizeInKB * 2 / 3) {
logger.warn("the size is to large to build to cache for snapshot:{}, size:{}, skip cache building",
extTableSnapshotInfo.getResourcePath(), extTableSnapshotInfo.getSignature().getSize());
return;
}
final String[] keyColumns = extTableSnapshotInfo.getKeyColumns();
- final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
+ final String cachePath = getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
+ extTableSnapshotInfo.getId());
final String dbPath = getSnapshotStorePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId());
final String snapshotResPath = extTableSnapshotInfo.getResourcePath();
@@ -278,8 +282,8 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
if (inBuildingTables.containsKey(resourcePath)) {
return CacheState.IN_BUILDING;
}
- File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
- extTableSnapshotInfo.getId()));
+ File stateFile = getCacheStateFile(
+ getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()));
if (!stateFile.exists()) {
return CacheState.NONE;
}
@@ -301,14 +305,14 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
}
private void saveSnapshotCacheState(ExtTableSnapshotInfo extTableSnapshotInfo, String cachePath) {
- File stateFile = getCacheStateFile(getSnapshotCachePath(extTableSnapshotInfo.getTableName(),
- extTableSnapshotInfo.getId()));
+ File stateFile = getCacheStateFile(
+ getSnapshotCachePath(extTableSnapshotInfo.getTableName(), extTableSnapshotInfo.getId()));
try {
Files.write(CacheState.AVAILABLE.name(), stateFile, Charsets.UTF_8);
tablesCache.put(extTableSnapshotInfo.getResourcePath(), new CachedTableInfo(cachePath));
} catch (IOException e) {
- throw new RuntimeException("error when write cache state for snapshot:"
- + extTableSnapshotInfo.getResourcePath());
+ throw new RuntimeException(
+ "error when write cache state for snapshot:" + extTableSnapshotInfo.getResourcePath());
}
}
@@ -347,17 +351,19 @@ public class RocksDBLookupTableCache implements IExtLookupTableCache {
}
}
- final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config).getAllExtSnapshotResPaths();
+ final Set<String> activeSnapshotSet = ExtTableSnapshotInfoManager.getInstance(config)
+ .getAllExtSnapshotResPaths();
- List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(FluentIterable.from(
- allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
- @Override
+ List<Pair<String, File>> toRemovedCachedSnapshots = Lists.newArrayList(
+ FluentIterable.from(allCachedSnapshots).filter(new Predicate<Pair<String, File>>() {
+ @Override
public boolean apply(@Nullable Pair<String, File> input) {
long lastModified = input.getSecond().lastModified();
return !activeSnapshotSet.contains(input.getFirst()) && lastModified > 0
- && lastModified < (System.currentTimeMillis() - config.getExtTableSnapshotLocalCacheCheckVolatileRange());
- }
- }));
+ && lastModified < (System.currentTimeMillis()
+ - config.getExtTableSnapshotLocalCacheCheckVolatileRange());
+ }
+ }));
for (Pair<String, File> toRemovedCachedSnapshot : toRemovedCachedSnapshots) {
File snapshotCacheFolder = toRemovedCachedSnapshot.getSecond();
logger.info("removed cache file:{}, it is not referred by any cube",
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
index d754b19..22f199a 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java
@@ -161,9 +161,10 @@ public class BlockingReservoir extends AbstractActiveReservoir {
startTime = System.currentTimeMillis();
continue;
} else if (size() < minReportSize && (System.currentTimeMillis() - startTime < maxReportTime)) {
- logger.info("The number of records in the blocking queue is less than {} and " +
- "the duration from last reporting is less than {} ms. " +
- "Will delay to report!", minReportSize, maxReportTime);
+ logger.info(
+ "The number of records in the blocking queue is less than {} and "
+ + "the duration from last reporting is less than {} ms. " + "Will delay to report!",
+ minReportSize, maxReportTime);
sleep();
continue;
}
@@ -177,7 +178,7 @@ public class BlockingReservoir extends AbstractActiveReservoir {
private void sleep() {
try {
- Thread.sleep(60 * 1000);
+ Thread.sleep(60 * 1000L);
} catch (InterruptedException e) {
logger.warn("Interrupted during running");
}
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
index 5d2d06f..f763605 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/StorageContext.java
@@ -155,7 +155,7 @@ public class StorageContext {
return;
}
- long temp = this.getOffset() + this.getLimit();
+ long temp = this.getOffset() + (long) this.getLimit();
if (!isValidPushDownLimit(temp)) {
logger.warn("Not enabling limit push down because current limit is invalid: " + this.getLimit());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java
index 68070eb..2ca4ce4 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/streaming/ColumnToRowJob.java
@@ -43,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class ColumnToRowJob extends AbstractHadoopJob {
private static final Logger logger = LoggerFactory.getLogger(ColumnToRowJob.class);
- private static final long DEFAULT_SIZE_PER_REDUCER = 16 * 1024 * 1024;
+ private static final long DEFAULT_SIZE_PER_REDUCER = 16 * 1024 * 1024L;
private static final int MAX_REDUCERS = 1000;
@Override
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
index 8be7249..155a586 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPProjectRel.java
@@ -136,7 +136,7 @@ public class OLAPProjectRel extends Project implements OLAPRel {
public RelOptCost computeSelfCost(RelOptPlanner planner, RelMetadataQuery mq) {
boolean hasRexOver = RexOver.containsOver(getProjects(), null);
RelOptCost relOptCost = super.computeSelfCost(planner, mq).multiplyBy(.05)
- .multiplyBy(getProjects().size() * (hasRexOver ? 50 : 1))
+ .multiplyBy(getProjects().size() * (double) (hasRexOver ? 50 : 1))
.plus(planner.getCostFactory().makeCost(0.1 * caseCount, 0, 0));
return planner.getCostFactory().makeCost(relOptCost.getRows(), 0, 0);
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
index da0a1e5..07a1da9 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/QueryController.java
@@ -208,7 +208,7 @@ public class QueryController extends BasicController {
if (runTimeMoreThan == -1) {
return QueryContextFacade.getAllRunningQueries();
} else {
- return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000);
+ return QueryContextFacade.getLongRunningQueries(runTimeMoreThan * 1000L);
}
}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
index ec2d263..ea3bbc2 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/job/KylinHealthCheckJob.java
@@ -18,7 +18,12 @@
package org.apache.kylin.rest.job;
-import com.google.common.collect.Lists;
+import java.io.IOException;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+import java.util.Locale;
+
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -47,11 +52,7 @@ import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.text.SimpleDateFormat;
-import java.util.Date;
-import java.util.List;
-import java.util.Locale;
+import com.google.common.collect.Lists;
public class KylinHealthCheckJob extends AbstractApplication {
private static final Logger logger = LoggerFactory.getLogger(KylinHealthCheckJob.class);
@@ -288,7 +289,7 @@ public class KylinHealthCheckJob extends AbstractApplication {
long sizeRecordSize = cube.getInputRecordSizeBytes();
if (sizeRecordSize > 0) {
long cubeDataSize = cube.getSizeKB() * 1024;
- double expansionRate = cubeDataSize / sizeRecordSize;
+ double expansionRate = (double) cubeDataSize / sizeRecordSize;
if (sizeRecordSize > 1L * expansionCheckMinCubeSizeInGb * 1024 * 1024 * 1024) {
if (expansionRate > warningExpansionRate) {
logger.info("Cube: {} in project: {} with too large expansion rate: {}, cube data size: {}G",
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
index 9299d78..ea3d0f1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/SqlUtil.java
@@ -23,6 +23,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Random;
+
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.source.hive.DBConnConf;
import org.slf4j.Logger;
@@ -84,7 +85,7 @@ public class SqlUtil {
logger.warn("while use:" + dbconf, e);
try {
int rt = r.nextInt(10);
- Thread.sleep(rt * 1000);
+ Thread.sleep(rt * 1000L);
} catch (InterruptedException e1) {
Thread.interrupted();
}
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
index 0bd60d5..ee05178 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/CubeMigrationCLI.java
@@ -97,7 +97,8 @@ public class CubeMigrationCLI {
private static final String ACL_INFO_FAMILY_PARENT_COLUMN = "p";
public static void main(String[] args) throws IOException, InterruptedException {
- logger.warn("org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead");
+ logger.warn(
+ "org.apache.kylin.storage.hbase.util.CubeMigrationCLI is deprecated, use org.apache.kylin.tool.CubeMigrationCLI instead");
if (args.length != 8) {
usage();
@@ -108,12 +109,22 @@ public class CubeMigrationCLI {
}
private static void usage() {
- System.out.println("Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute");
- System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n" + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n" + "cubeName: the name of cube to be migrated. \n" + "projectName: The target project in the target environment.(Make sure it exist) \n" + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n" + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n" + "overwriteIfExists: overw [...]
+ System.out.println(
+ "Usage: CubeMigrationCLI srcKylinConfigUri dstKylinConfigUri cubeName projectName copyAclOrNot purgeOrNot overwriteIfExists realExecute");
+ System.out.println(" srcKylinConfigUri: The KylinConfig of the cube’s source \n"
+ + "dstKylinConfigUri: The KylinConfig of the cube’s new home \n"
+ + "cubeName: the name of cube to be migrated. \n"
+ + "projectName: The target project in the target environment.(Make sure it exist) \n"
+ + "copyAclOrNot: true or false: whether copy cube ACL to target environment. \n"
+ + "purgeOrNot: true or false: whether purge the cube from src server after the migration. \n"
+ + "overwriteIfExists: overwrite cube if it already exists in the target environment. \n"
+ + "realExecute: if false, just print the operations to take, if true, do the real migration. \n");
}
- public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+ public static void moveCube(KylinConfig srcCfg, KylinConfig dstCfg, String cubeName, String projectName,
+ String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute)
+ throws IOException, InterruptedException {
srcConfig = srcCfg;
srcStore = ResourceStore.getStore(srcConfig);
@@ -163,12 +174,16 @@ public class CubeMigrationCLI {
}
}
- public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl, String purgeAndDisable, String overwriteIfExists, String realExecute) throws IOException, InterruptedException {
+ public static void moveCube(String srcCfgUri, String dstCfgUri, String cubeName, String projectName, String copyAcl,
+ String purgeAndDisable, String overwriteIfExists, String realExecute)
+ throws IOException, InterruptedException {
- moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName, projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
+ moveCube(KylinConfig.createInstanceFromUri(srcCfgUri), KylinConfig.createInstanceFromUri(dstCfgUri), cubeName,
+ projectName, copyAcl, purgeAndDisable, overwriteIfExists, realExecute);
}
- public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
+ public static void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix)
+ throws IOException {
CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
checkCLI.execute(cubeName);
}
@@ -198,12 +213,14 @@ public class CubeMigrationCLI {
private static void changeHtableHost(CubeInstance cube) {
for (CubeSegment segment : cube.getSegments()) {
- operations.add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
+ operations
+ .add(new Opt(OptType.CHANGE_HTABLE_HOST, new Object[] { segment.getStorageLocationIdentifier() }));
}
}
private static void copyACL(CubeInstance cube, String projectName) {
- operations.add(new Opt(OptType.COPY_ACL, new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName }));
+ operations.add(new Opt(OptType.COPY_ACL,
+ new Object[] { cube.getUuid(), cube.getDescriptor().getModel().getUuid(), projectName }));
}
private static void copyFilesInMetaStore(CubeInstance cube, String overwriteIfExists) throws IOException {
@@ -213,7 +230,8 @@ public class CubeMigrationCLI {
listCubeRelatedResources(cube, metaItems, dictAndSnapshot);
if (dstStore.exists(cube.getResourcePath()) && !overwriteIfExists.equalsIgnoreCase("true"))
- throw new IllegalStateException("The cube named " + cube.getName() + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
+ throw new IllegalStateException("The cube named " + cube.getName()
+ + " already exists on target metadata store. Use overwriteIfExists to overwrite it");
for (String item : metaItems) {
operations.add(new Opt(OptType.COPY_FILE_IN_META, new Object[] { item }));
@@ -224,7 +242,8 @@ public class CubeMigrationCLI {
}
}
- private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName) throws IOException {
+ private static void addCubeAndModelIntoProject(CubeInstance srcCube, String cubeName, String projectName)
+ throws IOException {
String projectResPath = ProjectInstance.concatResourcePath(projectName);
if (!dstStore.exists(projectResPath))
throw new IllegalStateException("The target project " + projectName + "does not exist");
@@ -236,7 +255,8 @@ public class CubeMigrationCLI {
operations.add(new Opt(OptType.PURGE_AND_DISABLE, new Object[] { cubeName }));
}
- private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource, Set<String> dictAndSnapshot) throws IOException {
+ private static void listCubeRelatedResources(CubeInstance cube, List<String> metaResource,
+ Set<String> dictAndSnapshot) throws IOException {
CubeDesc cubeDesc = cube.getDescriptor();
metaResource.add(cube.getResourcePath());
@@ -443,8 +463,10 @@ public class CubeMigrationCLI {
Table srcAclHtable = null;
Table destAclHtable = null;
try {
- srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl()).getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
- destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+ srcAclHtable = HBaseConnection.get(srcConfig.getStorageUrl())
+ .getTable(TableName.valueOf(srcConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+ destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl())
+ .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
// cube acl
Result result = srcAclHtable.get(new Get(Bytes.toBytes(cubeId)));
@@ -455,8 +477,10 @@ public class CubeMigrationCLI {
byte[] value = CellUtil.cloneValue(cell);
// use the target project uuid as the parent
- if (Bytes.toString(family).equals(ACL_INFO_FAMILY) && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
- String valueString = "{\"id\":\"" + projUUID + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
+ if (Bytes.toString(family).equals(ACL_INFO_FAMILY)
+ && Bytes.toString(column).equals(ACL_INFO_FAMILY_PARENT_COLUMN)) {
+ String valueString = "{\"id\":\"" + projUUID
+ + "\",\"type\":\"org.apache.kylin.metadata.project.ProjectInstance\"}";
value = Bytes.toBytes(valueString);
}
Put put = new Put(Bytes.toBytes(cubeId));
@@ -531,7 +555,8 @@ public class CubeMigrationCLI {
String modelId = (String) opt.params[1];
Table destAclHtable = null;
try {
- destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl()).getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
+ destAclHtable = HBaseConnection.get(dstConfig.getStorageUrl())
+ .getTable(TableName.valueOf(dstConfig.getMetadataUrlPrefix() + ACL_TABLE_NAME));
destAclHtable.delete(new Delete(Bytes.toBytes(cubeId)));
destAclHtable.delete(new Delete(Bytes.toBytes(modelId)));
@@ -572,7 +597,7 @@ public class CubeMigrationCLI {
if (nRetry > 3) {
throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath);
} else {
- Thread.sleep(sleepTime * nRetry * nRetry);
+ Thread.sleep((long) sleepTime * nRetry * nRetry);
}
}
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index 938c0b4..a0be4b1 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -37,10 +37,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
+import javax.annotation.Nullable;
+
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.leader.LeaderSelector;
import org.apache.curator.framework.recipes.leader.LeaderSelectorListenerAdapter;
@@ -68,23 +66,23 @@ import org.apache.kylin.stream.coordinator.assign.Assigner;
import org.apache.kylin.stream.coordinator.assign.AssignmentUtil;
import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.assign.CubePartitionRoundRobinAssigner;
+import org.apache.kylin.stream.coordinator.assign.DefaultAssigner;
+import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
-import org.apache.kylin.stream.coordinator.exception.StoreException;
-import org.apache.kylin.stream.coordinator.exception.ClusterStateException.TransactionStep;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException.ClusterState;
+import org.apache.kylin.stream.coordinator.exception.ClusterStateException.TransactionStep;
import org.apache.kylin.stream.coordinator.exception.CoordinateException;
import org.apache.kylin.stream.coordinator.exception.NotLeadCoordinatorException;
-import org.apache.kylin.stream.coordinator.assign.DefaultAssigner;
-import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
-import org.apache.kylin.stream.core.model.CubeAssignment;
-import org.apache.kylin.stream.core.model.ReplicaSet;
-import org.apache.kylin.stream.coordinator.client.CoordinatorClient;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.client.HttpReceiverAdminClient;
import org.apache.kylin.stream.core.client.ReceiverAdminClient;
+import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
+import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.Node;
import org.apache.kylin.stream.core.model.PauseConsumersRequest;
+import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.model.ResumeConsumerRequest;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.apache.kylin.stream.core.model.StartConsumersRequest;
@@ -97,21 +95,23 @@ import org.apache.kylin.stream.core.source.ISourcePositionHandler;
import org.apache.kylin.stream.core.source.ISourcePositionHandler.MergeStrategy;
import org.apache.kylin.stream.core.source.IStreamingSource;
import org.apache.kylin.stream.core.source.Partition;
-import org.apache.kylin.stream.core.source.StreamingTableSourceInfo;
import org.apache.kylin.stream.core.source.StreamingSourceFactory;
+import org.apache.kylin.stream.core.source.StreamingTableSourceInfo;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
import com.google.common.collect.Lists;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import javax.annotation.Nullable;
-
/**
* <pre>
* Each Kylin streaming cluster has at least one coordinator processes/server, coordinator
@@ -1226,8 +1226,8 @@ public class Coordinator implements CoordinatorClient {
private boolean isInOptimize(CubeInstance cube) {
Segments<CubeSegment> readyPendingSegments = cube.getSegments(SegmentStatusEnum.READY_PENDING);
if (readyPendingSegments.size() > 0) {
- logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building",
- cube.getName(), readyPendingSegments);
+ logger.info("The cube {} has READY_PENDING segments {}. It's not allowed for building", cube.getName(),
+ readyPendingSegments);
return true;
}
Segments<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW);
@@ -1240,7 +1240,9 @@ public class Coordinator implements CoordinatorClient {
if (job != null && job instanceof CubingJob) {
CubingJob cubingJob = (CubingJob) job;
if (CubingJob.CubingJobTypeEnum.OPTIMIZE.toString().equals(cubingJob.getJobType())) {
- logger.info("The cube {} is in optimization. It's not allowed to build new segments during optimization.", cube.getName());
+ logger.info(
+ "The cube {} is in optimization. It's not allowed to build new segments during optimization.",
+ cube.getName());
return true;
}
}
@@ -1333,7 +1335,7 @@ public class Coordinator implements CoordinatorClient {
restoreJobStatusChecker();
while (true) {
try {
- Thread.sleep(5 * 60 * 1000);
+ Thread.sleep(5 * 60 * 1000L);
} catch (InterruptedException exception) {
Thread.interrupted();
break;
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
index 20e4947..54553a8 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -146,7 +146,6 @@ public class StreamingCoordinator implements CoordinatorClient {
clusterStateCheckExecutor.scheduleAtFixedRate(clusterDoctor, 5, 10, TimeUnit.MINUTES);
}
-
/**
* Assign the streaming cube to replica sets. Replica sets is calculated by Assigner.
*
@@ -630,7 +629,7 @@ public class StreamingCoordinator implements CoordinatorClient {
buildJobSubmitter.restore();
while (true) {
try {
- Thread.sleep(5 * 60 * 1000);
+ Thread.sleep(5 * 60 * 1000L);
} catch (InterruptedException exception) {
Thread.interrupted();
break;
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
index 5982065..dde1be2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/ColumnarSegmentStore.java
@@ -247,7 +247,7 @@ public class ColumnarSegmentStore implements IStreamingSegmentStore {
List<DataSegmentFragment> result = Lists.newArrayList();
int originFragmentsNum = allFragments.size();
int minFragments = config.getStreamingMinFragmentsInSegment();
- long maxFragmentSize = config.getStreamingMaxFragmentSizeInMb() * 1024 * 1024;
+ long maxFragmentSize = config.getStreamingMaxFragmentSizeInMb() * 1024 * 1024L;
long toMergeDataSize = 0;
for (int i = 0; i < originFragmentsNum; i++) {
DataSegmentFragment fragment = allFragments.get(i);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
index bbe06ae..214ece3 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FSInputGeneralColumnDataReader.java
@@ -30,7 +30,7 @@ public class FSInputGeneralColumnDataReader implements ColumnDataReader {
public FSInputGeneralColumnDataReader(FSDataInputStream fsInputStream, int dataStartOffset, int dataLength)
throws IOException {
this.fsInputStream = fsInputStream;
- fsInputStream.seek(dataStartOffset + dataLength - 4);
+ fsInputStream.seek(dataStartOffset + dataLength - 4L);
this.numOfVals = fsInputStream.readInt();
fsInputStream.seek(dataStartOffset);
}
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
index e7bdbde..5419a3b 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/ReplicaSetLeaderSelector.java
@@ -73,7 +73,7 @@ public class ReplicaSetLeaderSelector extends LeaderSelectorListenerAdapter impl
}
while (true) {
try {
- Thread.sleep(5 * 60 * 1000);
+ Thread.sleep(5 * 60 * 1000L);
} catch (InterruptedException exception) {
Thread.interrupted();
break;
diff --git a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
index ce3b203..9212d08 100644
--- a/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/CubeMigrationCLI.java
@@ -201,7 +201,7 @@ public class CubeMigrationCLI extends AbstractApplication {
showOpts();
}
}
-
+
public void checkMigrationSuccess(KylinConfig kylinConfig, String cubeName, Boolean ifFix) throws IOException {
CubeMigrationCheckCLI checkCLI = new CubeMigrationCheckCLI(kylinConfig, ifFix);
checkCLI.execute(cubeName);
@@ -632,7 +632,7 @@ public class CubeMigrationCLI extends AbstractApplication {
}
}
}
-
+
private String renameTableWithinProject(String srcItem) {
if (dstProject != null && srcItem.contains(ResourceStore.TABLE_RESOURCE_ROOT)) {
String tableIdentity = TableDesc.parseResourcePath(srcItem).getTable();
@@ -670,7 +670,7 @@ public class CubeMigrationCLI extends AbstractApplication {
if (nRetry > 3) {
throw new InterruptedException("Cannot rename folder " + srcPath + " to folder " + dstPath);
} else {
- Thread.sleep(sleepTime * nRetry * nRetry);
+ Thread.sleep((long) sleepTime * nRetry * nRetry);
}
}
}