You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/11/27 14:43:50 UTC
[kylin] branch master updated: KYLIN-3597 fix static code issues
This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new 7bcc698 KYLIN-3597 fix static code issues
7bcc698 is described below
commit 7bcc6987e5bbcfd57f320aa19c4fba67ed02aaea
Author: shaofengshi <sh...@apache.org>
AuthorDate: Tue Nov 27 18:41:24 2018 +0800
KYLIN-3597 fix static code issues
fix UT
---
.../java/org/apache/kylin/common/util/Bytes.java | 47 ++++-----
.../validation/rule/AggregationGroupRule.java | 28 +++---
.../kylin/job/execution/AbstractExecutable.java | 27 ++---
.../BitmapIntersectDistinctCountAggFunc.java | 8 +-
.../kylin/metadata/cachesync/Broadcaster.java | 23 ++---
.../datasource/adaptor/AbstractJdbcAdaptor.java | 10 +-
.../sdk/datasource/adaptor/DefaultAdaptor.java | 3 +-
.../apache/kylin/engine/mr/ByteArrayWritable.java | 14 +--
.../mr/steps/RowKeyDistributionCheckerJob.java | 93 -----------------
.../mr/steps/RowKeyDistributionCheckerMapper.java | 112 ---------------------
.../mr/steps/RowKeyDistributionCheckerReducer.java | 51 ----------
.../broadcaster/BroadcasterReceiveServlet.java | 9 +-
.../kylin/source/jdbc/extensible/JdbcExplorer.java | 88 ++++++++--------
.../source/kafka/util/KafkaSampleProducer.java | 12 +--
14 files changed, 124 insertions(+), 401 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
index 33fc31a..a267921 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/Bytes.java
@@ -41,7 +41,6 @@ import java.util.Locale;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
import sun.misc.Unsafe;
/**
@@ -116,6 +115,7 @@ public class Bytes {
// JHat says BU is 56 bytes.
// SizeOf which uses java.lang.instrument says 24 bytes. (3 longs?)
public static final int ESTIMATED_HEAP_TAX = 16;
+ public static final String LENGTH_MUST_BE_GREATER_THAN_0 = "length must be greater than 0";
/**
* Returns length of the byte array, returning 0 if the array is null.
@@ -491,7 +491,7 @@ public class Bytes {
* @return incremented offset
*/
public static int putLongUnsafe(byte[] bytes, int offset, long val) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
val = Long.reverseBytes(val);
}
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putLong(bytes,
@@ -652,7 +652,7 @@ public class Bytes {
* @return the int value
*/
public static int toIntUnsafe(byte[] bytes, int offset) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
return Integer.reverseBytes(
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
.getInt(bytes, (long) offset
@@ -672,7 +672,7 @@ public class Bytes {
* @return the short value
*/
public static short toShortUnsafe(byte[] bytes, int offset) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
return Short.reverseBytes(
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
.getShort(bytes, (long) offset
@@ -692,7 +692,7 @@ public class Bytes {
* @return the long value
*/
public static long toLongUnsafe(byte[] bytes, int offset) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
return Long.reverseBytes(
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe
.getLong(bytes, (long) offset
@@ -763,7 +763,7 @@ public class Bytes {
* @return incremented offset
*/
public static int putIntUnsafe(byte[] bytes, int offset, int val) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
val = Integer.reverseBytes(val);
}
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putInt(bytes,
@@ -880,7 +880,7 @@ public class Bytes {
* @return incremented offset
*/
public static int putShortUnsafe(byte[] bytes, int offset, short val) {
- if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.littleEndian) {
+ if (org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.LITTLEENDIAN) {
val = Short.reverseBytes(val);
}
org.apache.kylin.common.util.Bytes.LexicographicalComparerHolder.UnsafeComparer.theUnsafe.putShort(bytes,
@@ -1097,7 +1097,7 @@ public class Bytes {
}
}
- static final boolean littleEndian = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
+ static final boolean LITTLEENDIAN = ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN);
/**
* Returns true if x1 is less than x2, when both values are treated as
@@ -1164,7 +1164,7 @@ public class Bytes {
long lw = theUnsafe.getLong(buffer1, offset1Adj + (long) i);
long rw = theUnsafe.getLong(buffer2, offset2Adj + (long) i);
long diff = lw ^ rw;
- if (littleEndian) {
+ if (LITTLEENDIAN) {
lw = Long.reverseBytes(lw);
rw = Long.reverseBytes(rw);
}
@@ -1177,7 +1177,7 @@ public class Bytes {
if (minLength - offset >= SIZEOF_INT) {
int il = theUnsafe.getInt(buffer1, offset1Adj + offset);
int ir = theUnsafe.getInt(buffer2, offset2Adj + offset);
- if (littleEndian) {
+ if (LITTLEENDIAN) {
il = Integer.reverseBytes(il);
ir = Integer.reverseBytes(ir);
}
@@ -1189,7 +1189,7 @@ public class Bytes {
if (minLength - offset >= SIZEOF_SHORT) {
short sl = theUnsafe.getShort(buffer1, offset1Adj + offset);
short sr = theUnsafe.getShort(buffer2, offset2Adj + offset);
- if (littleEndian) {
+ if (LITTLEENDIAN) {
sl = Short.reverseBytes(sl);
sr = Short.reverseBytes(sr);
}
@@ -1316,10 +1316,7 @@ public class Bytes {
/** Compute hash for binary data. */
public static int hashBytes(byte[] bytes, int offset, int length) {
- int hash = 1;
- for (int i = offset; i < offset + length; i++)
- hash = (31 * hash) + (int) bytes[i];
- return hash;
+ return hashCode(bytes, offset, length);
}
/**
@@ -1537,12 +1534,7 @@ public class Bytes {
};
- return new Iterable<byte[]>() {
- @Override
- public Iterator<byte[]> iterator() {
- return iterator;
- }
- };
+ return () -> iterator;
}
/**
@@ -1792,10 +1784,7 @@ public class Bytes {
public static boolean equals(List<byte[]> a, List<byte[]> b) {
if (a == null) {
- if (b == null) {
- return true;
- }
- return false;
+ return b == null;
}
if (b == null) {
return false;
@@ -1894,7 +1883,7 @@ public class Bytes {
*/
public static void zero(byte[] b, int offset, int length) {
checkPositionIndex(offset, b.length, "offset");
- checkArgument(length > 0, "length must be greater than 0");
+ checkArgument(length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
checkPositionIndex(offset + length, b.length, "offset + length");
Arrays.fill(b, offset, offset + length, (byte) 0);
}
@@ -1919,7 +1908,7 @@ public class Bytes {
*/
public static void random(byte[] b, int offset, int length) {
checkPositionIndex(offset, b.length, "offset");
- checkArgument(length > 0, "length must be greater than 0");
+ checkArgument(length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
checkPositionIndex(offset + length, b.length, "offset + length");
byte[] buf = new byte[length];
RNG.nextBytes(buf);
@@ -1964,7 +1953,7 @@ public class Bytes {
* @param b
*/
public static String toHex(byte[] b) {
- checkArgument(b.length > 0, "length must be greater than 0");
+ checkArgument(b.length > 0, LENGTH_MUST_BE_GREATER_THAN_0);
return String.format(Locale.ROOT, "%x", new BigInteger(1, b));
}
@@ -1975,7 +1964,7 @@ public class Bytes {
* @param hex
*/
public static byte[] fromHex(String hex) {
- checkArgument(hex.length() > 0, "length must be greater than 0");
+ checkArgument(hex.length() > 0, LENGTH_MUST_BE_GREATER_THAN_0);
checkArgument(hex.length() % 2 == 0, "length must be a multiple of 2");
// Make sure letters are upper case
hex = hex.toUpperCase(Locale.ROOT);
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
index 2b0358d..4670ac1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/AggregationGroupRule.java
@@ -40,17 +40,16 @@ import com.google.common.collect.Lists;
*/
public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
+ public static final String AGGREGATION_GROUP = "Aggregation group ";
+
@Override
public void validate(CubeDesc cube, ValidateContext context) {
inner(cube, context);
}
- public AggregationGroupRule() {
- }
-
private void inner(CubeDesc cube, ValidateContext context) {
- if (cube.getAggregationGroups() == null || cube.getAggregationGroups().size() == 0) {
+ if (cube.getAggregationGroups() == null || cube.getAggregationGroups().isEmpty()) {
context.addResult(ResultLevel.ERROR, "Cube should have at least one Aggregation group.");
return;
}
@@ -58,12 +57,12 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
int index = 1;
for (AggregationGroup agg : cube.getAggregationGroups()) {
if (agg.getIncludes() == null) {
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'includes' field not set");
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'includes' field not set");
continue;
}
if (agg.getSelectRule() == null) {
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'select rule' field not set");
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'select rule' field not set");
continue;
}
@@ -107,20 +106,20 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
notIncluded.add(dim);
}
}
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString());
continue;
}
if (CollectionUtils.containsAny(mandatoryDims, hierarchyDims)) {
Set<String> intersection = new HashSet<>(mandatoryDims);
intersection.retainAll(hierarchyDims);
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " mandatory dimension has overlap with hierarchy dimension: " + intersection.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " mandatory dimension has overlap with hierarchy dimension: " + intersection.toString());
continue;
}
if (CollectionUtils.containsAny(mandatoryDims, jointDims)) {
Set<String> intersection = new HashSet<>(mandatoryDims);
intersection.retainAll(jointDims);
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " mandatory dimension has overlap with joint dimension: " + intersection.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " mandatory dimension has overlap with joint dimension: " + intersection.toString());
continue;
}
@@ -134,7 +133,7 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
}
if (oneJoint.size() < 2) {
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " require at least 2 dimensions in a joint: " + oneJoint.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " require at least 2 dimensions in a joint: " + oneJoint.toString());
continue;
}
jointDimNum += oneJoint.size();
@@ -149,13 +148,13 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
overlapHierarchies++;
}
if (share.size() > 1) {
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " joint dimensions has overlap with more than 1 dimensions in same hierarchy: " + share.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " joint dimensions has overlap with more than 1 dimensions in same hierarchy: " + share.toString());
continue;
}
}
if (overlapHierarchies > 1) {
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " joint dimensions has overlap with more than 1 hierarchies");
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " joint dimensions has overlap with more than 1 hierarchies");
continue;
}
}
@@ -175,7 +174,7 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
}
existing.addAll(oneJoint);
}
- context.addResult(ResultLevel.ERROR, "Aggregation group " + index + " a dimension exists in more than one joint: " + overlap.toString());
+ context.addResult(ResultLevel.ERROR, AGGREGATION_GROUP + index + " a dimension exists in more than one joint: " + overlap.toString());
continue;
}
}
@@ -186,9 +185,8 @@ public class AggregationGroupRule implements IValidatorRule<CubeDesc> {
combination = getMaxCombinations(cube) + 1;
} finally {
if (combination > getMaxCombinations(cube)) {
- String msg = "Aggregation group " + index + " has too many combinations, current combination is " + combination + ", max allowed combination is " + getMaxCombinations(cube) + "; use 'mandatory'/'hierarchy'/'joint' to optimize; or update 'kylin.cube.aggrgroup.max-combination' to a bigger value.";
+ String msg = AGGREGATION_GROUP + index + " has too many combinations, current combination is " + combination + ", max allowed combination is " + getMaxCombinations(cube) + "; use 'mandatory'/'hierarchy'/'joint' to optimize; or update 'kylin.cube.aggrgroup.max-combination' to a bigger value.";
context.addResult(ResultLevel.ERROR, msg);
- continue;
}
}
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
index b8d3144..8548781 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java
@@ -57,6 +57,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected static final String INTERRUPT_TIME = "interruptTime";
protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class);
+ public static final String NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY = "no need to send email, user list is empty";
protected int retry = 0;
private KylinConfig config;
@@ -103,8 +104,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
exception = e;
try {
Thread.sleep(1000L * (long) Math.pow(4, nRetry));
- } catch (InterruptedException exp) {
- throw new RuntimeException(exp);
+ } catch (InterruptedException e1) {
+ throw new IllegalStateException(e1);
}
} else {
throw e;
@@ -145,7 +146,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
@Override
public final ExecuteResult execute(ExecutableContext executableContext) throws ExecuteException {
- logger.info("Executing AbstractExecutable (" + this.getName() + ")");
+ logger.info("Executing AbstractExecutable ({})", this.getName());
Preconditions.checkArgument(executableContext instanceof DefaultContext);
ExecuteResult result = null;
@@ -156,14 +157,14 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
Throwable realException;
do {
if (retry > 0) {
- logger.info("Retry " + retry);
+ logger.info("Retry {}", retry);
}
catchedException = null;
result = null;
try {
result = doWork(executableContext);
} catch (Throwable e) {
- logger.error("error running Executable: " + this.toString());
+ logger.error("error running Executable: {}", this.toString());
catchedException = e;
}
retry++;
@@ -191,7 +192,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
protected void handleMetadataPersistException(ExecutableContext context, Throwable exception) {
final String[] adminDls = context.getConfig().getAdminDls();
if (adminDls == null || adminDls.length < 1) {
- logger.warn("no need to send email, user list is empty");
+ logger.warn(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
return;
}
List<String> users = Lists.newArrayList(adminDls);
@@ -199,7 +200,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
Map<String, Object> dataMap = Maps.newHashMap();
dataMap.put("job_name", getName());
dataMap.put("env_name", context.getConfig().getDeployEnv());
- dataMap.put("submitter", StringUtil.noBlank(getSubmitter(), "missing submitter"));
+ dataMap.put(SUBMITTER, StringUtil.noBlank(getSubmitter(), "missing submitter"));
dataMap.put("job_engine", MailNotificationUtil.getLocalHostName());
dataMap.put("error_log",
Matcher.quoteReplacement(StringUtil.noBlank(exception.getMessage(), "no error message")));
@@ -314,7 +315,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
try {
List<String> users = getAllNofifyUsers(config);
if (users.isEmpty()) {
- logger.debug("no need to send email, user list is empty");
+ logger.debug(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
return;
}
final Pair<String, String> email = formatNotifications(context, state);
@@ -341,10 +342,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
logger.warn("no need to send email, content is null");
return;
}
- logger.info("prepare to send email to:" + users);
- logger.info("job name:" + getName());
- logger.info("submitter:" + getSubmitter());
- logger.info("notify list:" + users);
+ logger.info("prepare to send email to:{}", users);
+ logger.info("job name:{}", getName());
+ logger.info("submitter:{}", getSubmitter());
+ logger.info("notify list:{}", users);
new MailService(kylinConfig).sendMail(users, email.getFirst(), email.getSecond());
}
@@ -352,7 +353,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent {
try {
List<String> users = getAllNofifyUsers(config);
if (users.isEmpty()) {
- logger.debug("no need to send email, user list is empty");
+ logger.debug(NO_NEED_TO_SEND_EMAIL_USER_LIST_IS_EMPTY);
return;
}
doSendMail(config, users, email);
diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
index a1e2665..9771352 100644
--- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectDistinctCountAggFunc.java
@@ -50,10 +50,8 @@ public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount
this.keyList = keyList;
}
if (this.keyList != null && this.keyList.contains(key)) {
- BitmapCounter counter = map.get(key);
- if (counter == null) {
- map.put(key, counter = factory.newBitmap());
- }
+ BitmapCounter counter = map.computeIfAbsent(key, o -> factory.newBitmap());
+
counter.orWith((BitmapCounter) value);
}
}
@@ -78,7 +76,7 @@ public class BitmapIntersectDistinctCountAggFunc implements ParamAsMeasureCount
counter.andWith(c);
}
}
- return counter.getCount();
+ return counter != null ? counter.getCount() : 0;
}
}
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
index d5ecc16..b9c03b6 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/cachesync/Broadcaster.java
@@ -98,7 +98,7 @@ public class Broadcaster {
if (nodes == null || nodes.length < 1) {
logger.warn("There is no available rest server; check the 'kylin.server.cluster-servers' config");
}
- logger.debug(nodes.length + " nodes in the cluster: " + Arrays.toString(nodes));
+ logger.debug("{} nodes in the cluster: {}", (nodes == null ? 0 : nodes.length), Arrays.toString(nodes));
announceMainLoop.execute(new Runnable() {
@Override
@@ -110,7 +110,7 @@ public class Broadcaster {
final BroadcastEvent broadcastEvent = broadcastEvents.takeFirst();
String[] restServers = config.getRestServers();
- logger.debug("Servers in the cluster: " + Arrays.toString(restServers));
+ logger.debug("Servers in the cluster: {}", Arrays.toString(restServers));
for (final String node : restServers) {
if (restClientMap.containsKey(node) == false) {
restClientMap.put(node, new RestClient(node));
@@ -120,7 +120,7 @@ public class Broadcaster {
String toWhere = broadcastEvent.getTargetNode();
if (toWhere == null)
toWhere = "all";
- logger.debug("Announcing new broadcast to " + toWhere + ": " + broadcastEvent);
+ logger.debug("Announcing new broadcast to {}: {}", toWhere, broadcastEvent);
for (final String node : restServers) {
if (!(toWhere.equals("all") || toWhere.equals(node)))
@@ -196,11 +196,7 @@ public class Broadcaster {
}
private static void addListener(Map<String, List<Listener>> lmap, String entity, Listener listener) {
- List<Listener> list = lmap.get(entity);
- if (list == null) {
- list = new ArrayList<>();
- lmap.put(entity, list);
- }
+ List<Listener> list = lmap.computeIfAbsent(entity, s -> new ArrayList<>());
list.add(listener);
}
@@ -244,7 +240,7 @@ public class Broadcaster {
if (list.isEmpty())
return;
- logger.debug("Broadcasting " + event + ", " + entity + ", " + cacheKey);
+ logger.debug("Broadcasting {}, {}, {}", event, entity, cacheKey);
switch (entity) {
case SYNC_ALL:
@@ -278,7 +274,7 @@ public class Broadcaster {
break;
}
- logger.debug("Done broadcasting " + event + ", " + entity + ", " + cacheKey);
+ logger.debug("Done broadcasting {}, {}, {}", event, entity, cacheKey);
}
/**
@@ -363,7 +359,7 @@ public class Broadcaster {
}
}
- abstract public static class Listener {
+ public abstract static class Listener {
public void onClearAll(Broadcaster broadcaster) throws IOException {
}
@@ -452,10 +448,7 @@ public class Broadcaster {
if (!StringUtils.equals(cacheKey, other.cacheKey)) {
return false;
}
- if (!StringUtils.equals(entity, other.entity)) {
- return false;
- }
- return true;
+ return StringUtils.equals(entity, other.entity);
}
@Override
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
index 3669a32..f3dc78e 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/AbstractJdbcAdaptor.java
@@ -23,8 +23,8 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.sql.rowset.CachedRowSet;
@@ -57,7 +57,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
* @param config Basic configuration of JDBC source, such as driver name, URL, username, password.
* @throws Exception If datasource cannot be connected.
*/
- protected AbstractJdbcAdaptor(AdaptorConfig config) throws Exception {
+ protected AbstractJdbcAdaptor(AdaptorConfig config) throws ClassNotFoundException {
this.config = config;
this.dataSourceDef = DataSourceDefProvider.getInstance().getById(config.datasourceId);
@@ -138,7 +138,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
try {
closeable.close();
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException(e);
}
}
@@ -151,7 +151,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
try {
dataSource.close();
} catch (SQLException e) {
- throw new RuntimeException(e);
+ throw new IllegalStateException(e);
}
}
@@ -367,7 +367,7 @@ public abstract class AbstractJdbcAdaptor implements Closeable {
* @param columnInfo The column information, in the pair of NAME -> TYPE
* @return A set of SQL Statements which can be executed in JDBC source.
*/
- public abstract String[] buildSqlToCreateTable(String identity, LinkedHashMap<String, String> columnInfo);
+ public abstract String[] buildSqlToCreateTable(String identity, Map<String, String> columnInfo);
/**
* To build a set of sql statements to create view in JDBC source.
diff --git a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
index 3a14a1d..442b78e 100644
--- a/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
+++ b/datasource-sdk/src/main/java/org/apache/kylin/sdk/datasource/adaptor/DefaultAdaptor.java
@@ -22,7 +22,6 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Types;
import java.util.ArrayList;
-import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
@@ -246,7 +245,7 @@ public class DefaultAdaptor extends AbstractJdbcAdaptor {
}
@Override
- public String[] buildSqlToCreateTable(String tableIdentity, LinkedHashMap<String, String> columnInfo) {
+ public String[] buildSqlToCreateTable(String tableIdentity, Map<String, String> columnInfo) {
String dropsql = "DROP TABLE IF EXISTS " + tableIdentity;
String dropsql2 = "DROP VIEW IF EXISTS " + tableIdentity;
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
index a504899..037ab0b 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/ByteArrayWritable.java
@@ -130,12 +130,12 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
* @see java.lang.Object#equals(java.lang.Object)
*/
@Override
- public boolean equals(Object right_obj) {
- if (right_obj instanceof byte[]) {
- return compareTo((byte[]) right_obj) == 0;
+ public boolean equals(Object other) {
+ if (other instanceof byte[]) {
+ return compareTo((byte[]) other) == 0;
}
- if (right_obj instanceof ByteArrayWritable) {
- return compareTo((ByteArrayWritable) right_obj) == 0;
+ if (other instanceof ByteArrayWritable) {
+ return compareTo((ByteArrayWritable) other) == 0;
}
return false;
}
@@ -162,7 +162,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
/** A Comparator optimized for ByteArrayWritable.
*/
public static class Comparator extends WritableComparator {
- private BytesWritable.Comparator comparator = new BytesWritable.Comparator();
+ private BytesWritable.Comparator instance = new BytesWritable.Comparator();
/** constructor */
public Comparator() {
@@ -174,7 +174,7 @@ public class ByteArrayWritable implements WritableComparable<ByteArrayWritable>
*/
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
- return comparator.compare(b1, s1, l1, b2, s2, l2);
+ return instance.compare(b1, s1, l1, b2, s2, l2);
}
}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
deleted file mode 100644
index 3419949..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerJob.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerJob extends AbstractHadoopJob {
-
- @SuppressWarnings("static-access")
- protected static final Option ROW_KEY_STATS_FILE_PATH = OptionBuilder.withArgName("path").hasArg().isRequired(true).withDescription("rowKeyStatsFilePath").create("rowKeyStatsFilePath");
-
- @Override
- public int run(String[] args) throws Exception {
- Options options = new Options();
-
- options.addOption(OPTION_INPUT_PATH);
- options.addOption(OPTION_OUTPUT_PATH);
- options.addOption(OPTION_JOB_NAME);
- options.addOption(ROW_KEY_STATS_FILE_PATH);
-
- parseOptions(options, args);
-
- String statsFilePath = getOptionValue(ROW_KEY_STATS_FILE_PATH);
-
- // start job
- String jobName = getOptionValue(OPTION_JOB_NAME);
- job = Job.getInstance(getConf(), jobName);
-
- setJobClasspath(job, KylinConfig.getInstanceFromEnv());
-
- addInputDirs(getOptionValue(OPTION_INPUT_PATH), job);
-
- Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
- FileOutputFormat.setOutputPath(job, output);
-
- // Mapper
- job.setInputFormatClass(SequenceFileInputFormat.class);
- job.setMapperClass(RowKeyDistributionCheckerMapper.class);
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
-
- // Reducer - only one
- job.setReducerClass(RowKeyDistributionCheckerReducer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- job.setNumReduceTasks(1);
-
- job.getConfiguration().set("rowKeyStatsFilePath", statsFilePath);
-
- this.deletePath(job.getConfiguration(), output);
-
- return waitForCompletion(job);
- }
-
- public static void main(String[] args) throws Exception {
- int exitCode = ToolRunner.run(new RowKeyDistributionCheckerJob(), args);
- System.exit(exitCode);
- }
-
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
deleted file mode 100644
index d428dcb..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerMapper.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.SequenceFile;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.engine.mr.KylinMapper;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerMapper extends KylinMapper<Text, Text, Text, LongWritable> {
-
- String rowKeyStatsFilePath;
- byte[][] splitKeys;
- Map<Text, Long> resultMap;
- List<Text> keyList;
-
- @Override
- protected void doSetup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
-
- rowKeyStatsFilePath = context.getConfiguration().get("rowKeyStatsFilePath");
- splitKeys = this.getSplits(context.getConfiguration(), new Path(rowKeyStatsFilePath));
-
- resultMap = new HashMap<Text, Long>();
- keyList = new ArrayList<Text>();
- for (int i = 0; i < splitKeys.length; i++) {
- Text key = new Text(splitKeys[i]);
- resultMap.put(key, 0L);
- keyList.add(new Text(splitKeys[i]));
- }
- }
-
- @Override
- public void doMap(Text key, Text value, Context context) throws IOException, InterruptedException {
- for (Text t : keyList) {
- if (key.compareTo(t) < 0) {
- Long v = resultMap.get(t);
- long length = (long)key.getLength() + value.getLength();
- v += length;
- resultMap.put(t, v);
- break;
- }
- }
- }
-
- @Override
- protected void doCleanup(Context context) throws IOException, InterruptedException {
- LongWritable outputValue = new LongWritable();
- for (Entry<Text, Long> kv : resultMap.entrySet()) {
- outputValue.set(kv.getValue());
- context.write(kv.getKey(), outputValue);
- }
- }
-
- @SuppressWarnings("deprecation")
- public byte[][] getSplits(Configuration conf, Path path) {
- List<byte[]> rowkeyList = new ArrayList<byte[]>();
- SequenceFile.Reader reader = null;
- try {
- reader = new SequenceFile.Reader(HadoopUtil.getFileSystem(path, conf), path, conf);
- Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
- Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
- while (reader.next(key, value)) {
- byte[] tmp = ((Text) key).copyBytes();
- if (rowkeyList.contains(tmp) == false) {
- rowkeyList.add(tmp);
- }
- }
- } catch (Exception e) {
- e.printStackTrace();
- } finally {
- IOUtils.closeStream(reader);
- }
-
- byte[][] retValue = rowkeyList.toArray(new byte[rowkeyList.size()][]);
-
- return retValue;
- }
-}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
deleted file mode 100644
index af4a7df..0000000
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/RowKeyDistributionCheckerReducer.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.engine.mr.steps;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.kylin.engine.mr.KylinReducer;
-
-/**
- * @author ysong1
- *
- */
-public class RowKeyDistributionCheckerReducer extends KylinReducer<Text, LongWritable, Text, LongWritable> {
-
- LongWritable outputKey = new LongWritable(0L);
-
- @Override
- protected void doSetup(Context context) throws IOException {
- super.bindCurrentConfiguration(context.getConfiguration());
- }
-
- @Override
- public void doReduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
-
- long length = 0;
- for (LongWritable v : values) {
- length += v.get();
- }
-
- outputKey.set(length);
- context.write(key, outputKey);
- }
-}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java b/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
index c6247f0..a277cf3 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/broadcaster/BroadcasterReceiveServlet.java
@@ -24,7 +24,6 @@ import java.io.InputStreamReader;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -40,7 +39,7 @@ public class BroadcasterReceiveServlet extends HttpServlet {
void handle(String type, String name, String event);
}
- private BroadcasterHandler handler;
+ private final BroadcasterHandler handler;
public BroadcasterReceiveServlet(BroadcasterHandler handler) {
this.handler = handler;
@@ -50,16 +49,16 @@ public class BroadcasterReceiveServlet extends HttpServlet {
private static final Pattern PATTERN2 = Pattern.compile("/(.+)/(.+)");
@Override
- protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doPut(HttpServletRequest req, HttpServletResponse resp) throws IOException {
handle(req, resp);
}
@Override
- protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws IOException {
handle(req, resp);
}
- private void handle(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+ private void handle(HttpServletRequest req, HttpServletResponse resp) throws IOException {
final String startString = "/kylin/api/cache";
final String requestURI = req.getRequestURI();
final String substring = requestURI.substring(requestURI.indexOf(startString) + startString.length());
diff --git a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
index 0875244..e9330e1 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcExplorer.java
@@ -74,55 +74,57 @@ public class JdbcExplorer implements ISourceMetadataExplorer, ISampleDataDeploye
tableDesc.setProject(prj);
tableDesc.setSourceType(JdbcSource.SOURCE_ID);
- CachedRowSet tables = dataSource.getTable(database, table);
- String tableType = null;
- while (tables.next()) {
- tableType = tables.getString("TABLE_TYPE");
- }
- if (tableType != null) {
- tableDesc.setTableType(tableType);
- } else {
- throw new RuntimeException(String.format(Locale.ROOT, "table %s not found in schema:%s", table, database));
+ try (CachedRowSet tables = dataSource.getTable(database, table)) {
+ String tableType = null;
+ while (tables.next()) {
+ tableType = tables.getString("TABLE_TYPE");
+ }
+ if (tableType != null) {
+ tableDesc.setTableType(tableType);
+ } else {
+ throw new RuntimeException(String.format(Locale.ROOT, "table %s not found in schema:%s", table, database));
+ }
}
- CachedRowSet columns = dataSource.listColumns(database, table);
- List<ColumnDesc> columnDescs = new ArrayList<>();
-
- while (columns.next()) {
- String cname = columns.getString("COLUMN_NAME");
- int type = columns.getInt("DATA_TYPE");
- int csize = columns.getInt("COLUMN_SIZE");
- int digits = columns.getInt("DECIMAL_DIGITS");
- int pos = columns.getInt("ORDINAL_POSITION");
- String remarks = columns.getString("REMARKS");
-
- ColumnDesc cdesc = new ColumnDesc();
- cdesc.setName(cname.toUpperCase(Locale.ROOT));
-
- String kylinType = dataSource.toKylinTypeName(type);
- if ("any".equals(kylinType)) {
- String typeName = columns.getString("TYPE_NAME");
- int kylinTypeId = dataSource.toKylinTypeId(typeName, type);
- kylinType = dataSource.toKylinTypeName(kylinTypeId);
+ try (CachedRowSet columns = dataSource.listColumns(database, table)) {
+ List<ColumnDesc> columnDescs = new ArrayList<>();
+
+ while (columns.next()) {
+ String cname = columns.getString("COLUMN_NAME");
+ int type = columns.getInt("DATA_TYPE");
+ int csize = columns.getInt("COLUMN_SIZE");
+ int digits = columns.getInt("DECIMAL_DIGITS");
+ int pos = columns.getInt("ORDINAL_POSITION");
+ String remarks = columns.getString("REMARKS");
+
+ ColumnDesc cdesc = new ColumnDesc();
+ cdesc.setName(cname.toUpperCase(Locale.ROOT));
+
+ String kylinType = dataSource.toKylinTypeName(type);
+ if ("any".equals(kylinType)) {
+ String typeName = columns.getString("TYPE_NAME");
+ int kylinTypeId = dataSource.toKylinTypeId(typeName, type);
+ kylinType = dataSource.toKylinTypeName(kylinTypeId);
+ }
+ int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
+ int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
+
+ cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
+ cdesc.setId(String.valueOf(pos));
+ cdesc.setComment(remarks);
+ columnDescs.add(cdesc);
}
- int precision = (SqlUtil.isPrecisionApplicable(kylinType) && csize > 0) ? csize : -1;
- int scale = (SqlUtil.isScaleApplicable(kylinType) && digits > 0) ? digits : -1;
- cdesc.setDatatype(new DataType(kylinType, precision, scale).toString());
- cdesc.setId(String.valueOf(pos));
- cdesc.setComment(remarks);
- columnDescs.add(cdesc);
- }
-
- tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[columnDescs.size()]));
+ tableDesc.setColumns(columnDescs.toArray(new ColumnDesc[columnDescs.size()]));
- TableExtDesc tableExtDesc = new TableExtDesc();
- tableExtDesc.setIdentity(tableDesc.getIdentity());
- tableExtDesc.setUuid(UUID.randomUUID().toString());
- tableExtDesc.setLastModified(0);
- tableExtDesc.init(prj);
+ TableExtDesc tableExtDesc = new TableExtDesc();
+ tableExtDesc.setIdentity(tableDesc.getIdentity());
+ tableExtDesc.setUuid(UUID.randomUUID().toString());
+ tableExtDesc.setLastModified(0);
+ tableExtDesc.init(prj);
- return Pair.newPair(tableDesc, tableExtDesc);
+ return Pair.newPair(tableDesc, tableExtDesc);
+ }
}
@Override
diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
index 56e2dd5..5899c7c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaSampleProducer.java
@@ -52,13 +52,13 @@ public class KafkaSampleProducer {
private static final Option OPTION_BROKER = OptionBuilder.withArgName("broker").hasArg().isRequired(true).withDescription("Kafka broker").create("broker");
private static final Option OPTION_INTERVAL = OptionBuilder.withArgName("interval").hasArg().isRequired(false).withDescription("Simulated message interval in mili-seconds, default 1000").create("interval");
+ protected static final String OTHER = "Other";
private static final ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
logger.info("args: {}", Arrays.toString(args));
OptionsHelper optionsHelper = new OptionsHelper();
Options options = new Options();
- String topic, broker;
options.addOption(OPTION_TOPIC);
options.addOption(OPTION_BROKER);
options.addOption(OPTION_INTERVAL);
@@ -66,8 +66,8 @@ public class KafkaSampleProducer {
logger.info("options: '{}'", optionsHelper.getOptionsAsString());
- topic = optionsHelper.getOptionValue(OPTION_TOPIC);
- broker = optionsHelper.getOptionValue(OPTION_BROKER);
+ final String topic = optionsHelper.getOptionValue(OPTION_TOPIC);
+ final String broker = optionsHelper.getOptionValue(OPTION_BROKER);
long interval = 10;
String intervalString = optionsHelper.getOptionValue(OPTION_INTERVAL);
@@ -83,18 +83,18 @@ public class KafkaSampleProducer {
countries.add("JAPAN");
countries.add("KOREA");
countries.add("US");
- countries.add("Other");
+ countries.add(OTHER);
List<String> category = new ArrayList<>();
category.add("BOOK");
category.add("TOY");
category.add("CLOTH");
category.add("ELECTRONIC");
- category.add("Other");
+ category.add(OTHER);
List<String> devices = new ArrayList<>();
devices.add("iOS");
devices.add("Windows");
devices.add("Andriod");
- devices.add("Other");
+ devices.add(OTHER);
List<String> genders = new ArrayList<>();
genders.add("Male");