You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2019/12/06 12:36:38 UTC
[kylin] branch master updated: KYLIN-4167 Phase2 (#961)
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new f2bea28 KYLIN-4167 Phase2 (#961)
f2bea28 is described below
commit f2bea280d12061b2fb5f39f5b301579178d70e7a
Author: Xiaoxiang Yu <hi...@126.com>
AuthorDate: Fri Dec 6 20:36:30 2019 +0800
KYLIN-4167 Phase2 (#961)
* KYLIN-4167 Phase2
- Fix bugs when cubing job been dropped
- Drop former segment info when submit new one
- Add new logfile for streaming coordinator more log to let diagnostic easier.
* KYLIN-4010 Auto ajust timezone
* KYLIN-4010 Fix segment purge bugs
---
build/conf/kylin-server-log4j.properties | 24 ++++-
.../org/apache/kylin/common/KylinConfigBase.java | 10 +-
.../org/apache/kylin/common/util/DateFormat.java | 14 ++-
.../org/apache/kylin/common/util/TimeUtil.java | 6 +-
.../src/main/resources/kylin-defaults.properties | 35 ++++++-
.../kylin/dimension/TimeDerivedColumnType.java | 18 ++++
.../kylin/storage/gtrecord/CubeTupleConverter.java | 17 +--
.../apache/kylin/query/relnode/OLAPFilterRel.java | 2 +-
.../query/relnode/visitor/TupleFilterVisitor.java | 13 ++-
.../rest/controller/StreamingV2Controller.java | 11 +-
.../kylin/storage/stream/StreamStorageQuery.java | 2 +-
.../stream/coordinator/StreamMetadataStore.java | 23 ++++
.../coordinator/ZookeeperStreamMetadataStore.java | 116 ++++++++++++++++++++-
.../coordinator/assign/AssignmentsCache.java | 66 ++++++++----
.../stream/coordinator/assign/DefaultAssigner.java | 9 ++
.../coordinator/coordinate/BuildJobSubmitter.java | 93 +++++++++++------
.../coordinate/ReceiverClusterManager.java | 35 ++++---
.../coordinate/StreamingCoordinator.java | 14 +--
.../coordinator/doctor/ClusterStateChecker.java | 64 ++++++++++--
.../coordinate/BuildJobSubmitterTest.java | 7 +-
.../coordinator/coordinate/StreamingTestBase.java | 1 -
.../stream/core/client/ReceiverAdminClient.java | 5 +-
.../core/consumer/StreamingConsumerChannel.java | 1 +
.../core/query/StreamingDataQueryPlanner.java | 15 ++-
.../stream/core/query/StreamingTupleConverter.java | 4 +-
.../core/source/IStreamingMessageParser.java | 3 +
.../storage/columnar/FragmentFileSearcher.java | 23 ++--
.../core/util/CompareFilterTimeRangeChecker.java | 17 ++-
.../stream/source/kafka/TimedJsonStreamParser.java | 11 +-
29 files changed, 525 insertions(+), 134 deletions(-)
diff --git a/build/conf/kylin-server-log4j.properties b/build/conf/kylin-server-log4j.properties
index e4d204f..bcaea65 100644
--- a/build/conf/kylin-server-log4j.properties
+++ b/build/conf/kylin-server-log4j.properties
@@ -25,8 +25,24 @@ log4j.appender.file.Append=true
log4j.appender.file.MaxFileSize=268435456
log4j.appender.file.MaxBackupIndex=10
+log4j.appender.realtime=org.apache.log4j.RollingFileAppender
+log4j.appender.realtime.layout=org.apache.log4j.PatternLayout
+log4j.appender.realtime.File=${catalina.home}/../logs/streaming_coordinator.log
+log4j.appender.realtime.layout.ConversionPattern=%d{ISO8601} %-5p [%t] %c{2}:%L : %m%n
+log4j.appender.realtime.Append=true
+log4j.appender.realtime.MaxFileSize=268435456
+log4j.appender.realtime.MaxBackupIndex=10
+
#overall config
-log4j.rootLogger=INFO,file
-log4j.logger.org.apache.kylin=DEBUG
-log4j.logger.org.springframework=WARN
-log4j.logger.org.springframework.security=INFO
\ No newline at end of file
+log4j.rootLogger=INFO
+log4j.logger.org.apache.kylin=DEBUG,file
+log4j.logger.org.springframework=WARN,file
+log4j.logger.org.springframework.security=INFO,file
+
+log4j.additivity.logger.org.apache.kylin.stream=false
+log4j.logger.org.apache.kylin.stream=TRACE,realtime
+log4j.logger.org.apache.kylin.job=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.service.StreamingCoordinatorService=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.service.StreamingV2Service=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.controller.StreamingCoordinatorController=DEBUG,realtime
+log4j.logger.org.apache.kylin.rest.controller.StreamingV2Controller=DEBUG,realtime
\ No newline at end of file
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index bbffcc1..d44d944 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2116,7 +2116,7 @@ public abstract class KylinConfigBase implements Serializable {
}
// ============================================================================
- // streaming
+ // Realtime streaming
// ============================================================================
public String getStreamingStoreClass() {
return getOptional("kylin.stream.store.class",
@@ -2281,8 +2281,12 @@ public abstract class KylinConfigBase implements Serializable {
/**
* whether realtime query should add timezone offset by kylin's web-timezone, please refer to KYLIN-4010 for detail
*/
- public boolean isStreamingAutoJustTimezone() {
- return Boolean.parseBoolean(getOptional("kylin.stream.auto.just.by.timezone", "false"));
+ public String getStreamingDerivedTimeTimezone() {
+ return (getOptional("kylin.stream.event.timezone", ""));
+ }
+
+ public boolean isAutoResubmitDiscardJob(){
+ return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true"));
}
// ============================================================================
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
index 1c6b551..2f51cbb 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/DateFormat.java
@@ -32,6 +32,7 @@ public class DateFormat {
public static final String DEFAULT_TIME_PATTERN = "HH:mm:ss";
public static final String DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS = "yyyy-MM-dd HH:mm:ss";
public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS = "yyyy-MM-dd HH:mm:ss.SSS";
+ public static final String DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET = "yyyy-MM-dd HH:mm:ss.SSSZZ";
public static final String YYYY_MM_DD_HH_MM = "yyyy-MM-dd HH:mm";
public static final String YYYY_MM_DD_HH = "yyyy-MM-dd HH";
public static final String YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
@@ -39,11 +40,12 @@ public class DateFormat {
public static final String YYYYMMDDHH = "yyyyMMddHH";
public static final String ISO_8601_24H_FULL_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSZZ";
- public static final String[] SUPPORTED_DATETIME_PATTERN = { //
- DEFAULT_DATE_PATTERN, //
- DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS, //
- DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS, //
- COMPACT_DATE_PATTERN, //
+ public static final String[] SUPPORTED_DATETIME_PATTERN = {
+ DEFAULT_DATE_PATTERN,
+ DEFAULT_DATETIME_PATTERN_WITHOUT_MILLISECONDS,
+ DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS,
+ DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET,
+ COMPACT_DATE_PATTERN,
ISO_8601_24H_FULL_FORMAT,
YYYY_MM_DD_HH_MM,
YYYY_MM_DD_HH,
@@ -143,6 +145,8 @@ public class DateFormat {
} else if (str.length() > 19) {
if (str.contains("T")) {
return stringToDate(str, ISO_8601_24H_FULL_FORMAT).getTime();
+ } else if (str.contains("+")) {
+ return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS_OFFSET).getTime();
} else {
return stringToDate(str, DEFAULT_DATETIME_PATTERN_WITH_MILLISECONDS).getTime();
}
diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
index 6eb324b..cd53cc6 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java
@@ -31,9 +31,9 @@ public class TimeUtil {
}
private static TimeZone gmt = TimeZone.getTimeZone("GMT");
- public static long ONE_MINUTE_TS = 60 * 1000L;
- public static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
- public static long ONE_DAY_TS = 24 * ONE_HOUR_TS;
+ public static final long ONE_MINUTE_TS = 60 * 1000L;
+ public static final long ONE_HOUR_TS = 60 * ONE_MINUTE_TS;
+ public static final long ONE_DAY_TS = 24 * ONE_HOUR_TS;
public static long getMinuteStart(long ts) {
return ts / ONE_MINUTE_TS * ONE_MINUTE_TS;
diff --git a/core-common/src/main/resources/kylin-defaults.properties b/core-common/src/main/resources/kylin-defaults.properties
index 10ca8e6..cc6ee38 100644
--- a/core-common/src/main/resources/kylin-defaults.properties
+++ b/core-common/src/main/resources/kylin-defaults.properties
@@ -368,4 +368,37 @@ kylin.engine.spark-conf-mergedict.spark.memory.fraction=0.2
#kylin.engine.livy-conf.livy-enabled=false
#kylin.engine.livy-conf.livy-url=http://LivyHost:8998
#kylin.engine.livy-conf.livy-key.file=hdfs:///path-to-kylin-job-jar
-#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
\ No newline at end of file
+#kylin.engine.livy-conf.livy-arr.jars=hdfs:///path-to-hadoop-dependency-jar
+
+
+### Realtime OLAP ###
+
+# Where should local segment cache located, for absolute path, the real path will be ${KYLIN_HOME}/${kylin.stream.index.path}
+kylin.stream.index.path=stream_index
+
+# The timezone for Derived Time Column like hour_start, try set to GMT+N, please check detail at KYLIN-4010
+kylin.stream.event.timezone=
+
+# Debug switch for print realtime global dict encode information, please check detail at KYLIN-4141
+kylin.stream.print-realtime-dict-enabled=false
+
+# Should enable latest coordinator, please check detail at KYLIN-4167
+kylin.stream.new.coordinator-enabled=true
+
+# In which way should we collect receiver's metrics info
+#kylin.stream.metrics.option=console/csv/jmx
+
+# When enable a streaming cube, should cousme from earliest offset or least offset
+kylin.stream.consume.offsets.latest=true
+
+# The parallelism of scan in receiver side
+kylin.stream.receiver.use-threads-per-query=8
+
+# How coordinator/receiver register itself into StreamMetadata, there are three option:
+# 1. hostname:port, then kylin will set the config ip and port as the currentNode;
+# 2. port, then kylin will get the node's hostname and append port as the currentNode;
+# 3. not set, then kylin will get the node hostname address and set the hostname and defaultPort(7070 for coordinator or 9090 for receiver) as the currentNode.
+#kylin.stream.node=
+
+# Auto resubmit after job be discarded
+kylin.stream.auto-resubmit-after-discard-enabled=true
diff --git a/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
index 01953e7..43cff4e 100644
--- a/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
+++ b/core-metadata/src/main/java/org/apache/kylin/dimension/TimeDerivedColumnType.java
@@ -28,6 +28,8 @@ import org.apache.kylin.common.util.Pair;
import org.apache.kylin.common.util.TimeUtil;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public enum TimeDerivedColumnType {
MINUTE_START("minute_start") {
@@ -157,6 +159,7 @@ public enum TimeDerivedColumnType {
private static final String QUARTER_START_NAME = "quarter_start";
private static final String YEAR_START_NAME = "year_start";
private static Map<String, TimeDerivedColumnType> nameColumnsMap = Maps.newHashMap();
+ private static Logger logger = LoggerFactory.getLogger(TimeDerivedColumnType.class);
static {
nameColumnsMap.put(MINUTE_START_NAME, MINUTE_START);
@@ -178,6 +181,14 @@ public enum TimeDerivedColumnType {
return nameColumnsMap.containsKey(columnName.toLowerCase(Locale.ROOT));
}
+ public static boolean isTimeDerivedColumnAboveDayLevel(String columnName) {
+ if (!isTimeDerivedColumn(columnName))
+ return false;
+ else {
+ return !columnName.equalsIgnoreCase(MINUTE_START_NAME) && !columnName.equalsIgnoreCase(HOUR_START_NAME);
+ }
+ }
+
public static TimeDerivedColumnType getTimeDerivedColumnType(String columnName) {
return nameColumnsMap.get(columnName.toLowerCase(Locale.ROOT));
}
@@ -211,6 +222,13 @@ public enum TimeDerivedColumnType {
return calculateTimeUnitRange(time);
}
+ public Pair<Long, Long> getTimeUnitRangeTimezoneAware(Object timeValue, long timezoneOffset){
+ long ts = parseTimeValue(timeValue);
+ Pair<Long, Long> res = calculateTimeUnitRange(ts);
+ res = new Pair<>(res.getFirst() - timezoneOffset, res.getSecond() - timezoneOffset);
+ return res;
+ }
+
abstract public Pair<Long, Long> calculateTimeUnitRange(long time);
abstract public String normalizeTimeFormat(long time);
diff --git a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
index 94734b3..8e2a795 100644
--- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
+++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeTupleConverter.java
@@ -69,9 +69,9 @@ public class CubeTupleConverter implements ITupleConverter {
private List<ILookupTable> usedLookupTables;
final Set<Integer> timestampColumn = new HashSet<>();
+ String eventTimezone;
boolean autoJustByTimezone;
- private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
- .getRawOffset();
+ private final long timeZoneOffset;
public final int nSelectedDims;
@@ -92,11 +92,14 @@ public class CubeTupleConverter implements ITupleConverter {
advMeasureFillers = Lists.newArrayListWithCapacity(1);
advMeasureIndexInGTValues = Lists.newArrayListWithCapacity(1);
usedLookupTables = Lists.newArrayList();
- autoJustByTimezone = cubeSeg.getConfig().isStreamingAutoJustTimezone();
- autoJustByTimezone = autoJustByTimezone
+ eventTimezone = cubeSeg.getConfig().getStreamingDerivedTimeTimezone();
+ autoJustByTimezone = eventTimezone.length() > 0
&& cubeSeg.getCubeDesc().getModel().getRootFactTable().getTableDesc().isStreamingTable();
if (autoJustByTimezone) {
logger.debug("Will ajust dimsension for Time Derived Column.");
+ timeZoneOffset = TimeZone.getTimeZone(eventTimezone).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
}
////////////
@@ -105,8 +108,10 @@ public class CubeTupleConverter implements ITupleConverter {
// pre-calculate dimension index mapping to tuple
for (TblColRef dim : selectedDimensions) {
tupleIdx[i] = tupleInfo.hasColumn(dim) ? tupleInfo.getColumnIndex(dim) : -1;
- if (dim.getType().isDateTimeFamily() && TimeDerivedColumnType.isTimeDerivedColumn(dim.getName()))
+ if (TimeDerivedColumnType.isTimeDerivedColumn(dim.getName())
+ && !TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(dim.getName())) {
timestampColumn.add(tupleIdx[i]);
+ }
i++;
}
@@ -167,7 +172,7 @@ public class CubeTupleConverter implements ITupleConverter {
try {
String v = toString(gtValues[i]);
if (v != null) {
- tuple.setDimensionValue(ti, Long.toString(Long.parseLong(v) + TIME_ZONE_OFFSET));
+ tuple.setDimensionValue(ti, Long.toString(Long.parseLong(v) + timeZoneOffset));
}
} catch (NumberFormatException nfe) {
logger.warn("{} is not a long value.", gtValues[i]);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
index 3a76be6..bfd6c4d 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPFilterRel.java
@@ -55,7 +55,7 @@ public class OLAPFilterRel extends Filter implements OLAPRel {
ColumnRowType columnRowType;
OLAPContext context;
- boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
+ boolean autoJustTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
public OLAPFilterRel(RelOptCluster cluster, RelTraitSet traits, RelNode child, RexNode condition) {
super(cluster, traits, child, condition);
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
index 3e07eca..e0792a0 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/visitor/TupleFilterVisitor.java
@@ -52,6 +52,8 @@ import org.apache.kylin.metadata.filter.UnsupportedTupleFilter;
import org.apache.kylin.metadata.filter.function.Functions;
import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.query.relnode.ColumnRowType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.util.GregorianCalendar;
@@ -62,11 +64,13 @@ import java.util.TimeZone;
public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
+ private static Logger logger = LoggerFactory.getLogger(TupleFilterVisitor.class);
+
final ColumnRowType inputRowType;
// is the fact table is a streamingv2 table
private boolean autoJustByTimezone = false;
- private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+ private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
.getRawOffset();
public TupleFilterVisitor(ColumnRowType inputRowType) {
@@ -221,9 +225,10 @@ public class TupleFilterVisitor extends RexVisitorImpl<TupleFilter> {
newValues.add(null);
} else {
long ts = DateFormat.stringToMillis(v.toString());
- // minus offset by timezone in RelNode level
- // this will affect request sent to storage level
- if (autoJustByTimezone) {
+ // Change column value of date/timestamp type from local timezone to UTC timezone by minus offset in RelNode level.
+ // This will change request sent to storage level(receiver), thus affect segment/fragment level purge.
+ if (autoJustByTimezone && (type.getFamily() == SqlTypeFamily.TIMESTAMP
+ || type.getFamily() == SqlTypeFamily.DATETIME)) {
ts -= TIME_ZONE_OFFSET;
}
newValues.add(String.valueOf(ts));
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
index f22ec89..cfd7086 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingV2Controller.java
@@ -34,6 +34,7 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.dimension.TimeDerivedColumnType;
import org.apache.kylin.metadata.datatype.DataType;
import org.apache.kylin.metadata.model.ColumnDesc;
import org.apache.kylin.metadata.model.ISourceAware;
@@ -189,7 +190,7 @@ public class StreamingV2Controller extends BasicController {
List<FieldSchema> fields;
try {
HiveMetaStoreClient metaStoreClient = new HiveMetaStoreClient(new HiveConf());
- fields = metaStoreClient.getFields(tableDesc.getDatabase(), tableDesc.getName());
+ fields = metaStoreClient.getFields(KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable(), tableDesc.getName());
} catch (NoSuchObjectException noObjectException) {
logger.info("table not exist in hive meta store for table:" + tableDesc.getIdentity(),
noObjectException);
@@ -208,8 +209,12 @@ public class StreamingV2Controller extends BasicController {
for (ColumnDesc columnDesc : tableDesc.getColumns()) {
FieldSchema fieldSchema = fieldSchemaMap.get(columnDesc.getName().toUpperCase(Locale.ROOT));
if (fieldSchema == null) {
- incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
- continue;
+ if (!TimeDerivedColumnType.isTimeDerivedColumn(columnDesc.getName())) {
+ incompatibleMsgs.add("column not exist in hive table:" + columnDesc.getName());
+ continue;
+ } else {
+ continue;
+ }
}
if (!checkHiveTableFieldCompatible(fieldSchema, columnDesc)) {
String msg = String.format(Locale.ROOT,
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
index f2050c1..b01e183 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/StreamStorageQuery.java
@@ -106,7 +106,7 @@ public class StreamStorageQuery extends CubeStorageQuery {
ITupleIterator realTimeResult;
if (segmentsPlanner.canSkip(maxHistorySegmentTime, Long.MAX_VALUE)) {
- logger.info("Skip scan realTime data");
+ logger.info("Skip scan realTime data, {}", maxHistorySegmentTime);
realTimeResult = ITupleIterator.EMPTY_TUPLE_ITERATOR;
} else {
boolean isSelectAllQuery = isSelectAllQuery(request.getCuboid(), request.getGroups(), request.getFilter());
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
index 309bee9..c8d4197 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/StreamMetadataStore.java
@@ -28,21 +28,42 @@ import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.apache.kylin.stream.core.model.StreamingCubeConsumeState;
import org.apache.kylin.stream.core.source.Partition;
+/**
+ * Independent metadata store for realtime streaming cluster, these metadata is transient.
+ */
public interface StreamMetadataStore {
+ /**
+ * @return all streaming receivers, whether alive or not
+ */
List<Node> getReceivers();
+ /**
+ * @return all streaming cube which in enable state
+ */
List<String> getCubes();
+ /**
+ * @param cubeName the cube which need to be enable
+ */
void addStreamingCube(String cubeName);
+ /**
+ * @param cubeName the cube which need to be disable(stop consuming)
+ */
void removeStreamingCube(String cubeName);
StreamingCubeConsumeState getStreamingCubeConsumeState(String cubeName);
void saveStreamingCubeConsumeState(String cubeName, StreamingCubeConsumeState state);
+ /**
+ * @param receiver the streaming receiver which should be created
+ */
void addReceiver(Node receiver);
+ /**
+ * @param receiver the streaming receiver which need to remove
+ */
void removeReceiver(Node receiver);
void removeCubeAssignment(String cubeName);
@@ -115,4 +136,6 @@ public interface StreamMetadataStore {
SegmentBuildState getSegmentBuildState(String cubeName, String segmentName);
boolean removeSegmentBuildState(String cubeName, String segmentName);
+
+ default void reportStat(){}
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
index 4431b4c..3de0c82 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/ZookeeperStreamMetadataStore.java
@@ -21,6 +21,7 @@ package org.apache.kylin.stream.coordinator;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
@@ -61,6 +62,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
private String cubeRoot;
private String coordinatorRoot;
+ private AtomicLong readSuccess = new AtomicLong();
+ private AtomicLong readFail = new AtomicLong();
+ private AtomicLong writeSuccess = new AtomicLong();
+ private AtomicLong writeFail = new AtomicLong();
+
public ZookeeperStreamMetadataStore() {
this.client = StreamingUtils.getZookeeperClient();
this.zkRoot = StreamingUtils.STREAM_ZK_ROOT;
@@ -93,9 +99,13 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void removeCubeAssignment(String cubeName) {
+ logger.trace("Remove cube assignment {}.", cubeName);
+ checkPath(cubeName);
try {
client.delete().forPath(ZKPaths.makePath(cubeRoot, cubeName, CUBE_ASSIGNMENT));
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when remove cube assignment " + cubeName, e);
throw new StoreException(e);
}
@@ -114,8 +124,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
cubeAssignmentList.add(assignment);
}
}
+ readSuccess.getAndIncrement();
return cubeAssignmentList;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get assignments", e);
throw new StoreException(e);
}
@@ -125,8 +137,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
public Map<Integer, Map<String, List<Partition>>> getAllReplicaSetAssignments() {
try {
List<CubeAssignment> cubeAssignmentList = getAllCubeAssignments();
+ readSuccess.getAndIncrement();
return AssignmentUtil.convertCubeAssign2ReplicaSetAssign(cubeAssignmentList);
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get assignments", e);
throw new StoreException(e);
}
@@ -136,8 +150,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
public Map<String, List<Partition>> getAssignmentsByReplicaSet(int replicaSetID) {
try {
Map<Integer, Map<String, List<Partition>>> replicaSetAssignmentsMap = getAllReplicaSetAssignments();
+ readSuccess.getAndIncrement();
return replicaSetAssignmentsMap.get(replicaSetID);
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get assignment for replica set " + replicaSetID, e);
throw new StoreException(e);
}
@@ -148,12 +164,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
try {
String cubeAssignmentPath = getCubeAssignmentPath(cubeName);
if (client.checkExists().forPath(cubeAssignmentPath) == null) {
+ logger.warn("Cannot find content at {}.", cubeAssignmentPath);
return null;
}
byte[] data = client.getData().forPath(cubeAssignmentPath);
+ readSuccess.getAndIncrement();
CubeAssignment assignment = CubeAssignment.deserializeCubeAssignment(data);
return assignment;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get cube assignment for " + cubeName, e);
throw new StoreException(e);
}
@@ -164,11 +183,13 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
List<ReplicaSet> result = Lists.newArrayList();
try {
List<String> replicaSetIDs = client.getChildren().forPath(replicaSetRoot);
+ readSuccess.getAndIncrement();
for (String replicaSetID : replicaSetIDs) {
ReplicaSet replicaSet = getReplicaSet(Integer.parseInt(replicaSetID));
result.add(replicaSet);
}
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get replica sets", e);
throw new StoreException(e);
}
@@ -179,6 +200,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
public List<Integer> getReplicaSetIDs() {
try {
List<String> replicaSetIDs = client.getChildren().forPath(replicaSetRoot);
+ readSuccess.getAndIncrement();
return Lists.transform(replicaSetIDs, new Function<String, Integer>() {
@Nullable
@Override
@@ -187,6 +209,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
}
});
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get replica sets", e);
throw new StoreException(e);
}
@@ -214,11 +237,14 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
currMaxID = Collections.max(rsIDList);
}
int newReplicaSetID = currMaxID + 1;
+ logger.trace("Id of new replica set {} is {}.", rs, newReplicaSetID);
rs.setReplicaSetID(newReplicaSetID);
String replicaSetPath = ZKPaths.makePath(replicaSetRoot, String.valueOf(newReplicaSetID));
client.create().creatingParentsIfNeeded().forPath(replicaSetPath, serializeReplicaSet(rs));
+ writeSuccess.getAndIncrement();
return newReplicaSetID;
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when create replicaSet " + rs, e);
throw new StoreException(e);
}
@@ -230,7 +256,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
byte[] replicaSetData = serializeReplicaSet(rs);
client.setData().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rs.getReplicaSetID())),
replicaSetData);
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("error when update replicaSet " + rs, e);
throw new StoreException(e);
}
@@ -240,8 +268,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
public Node getCoordinatorNode() {
try {
byte[] nodeData = client.getData().forPath(coordinatorRoot);
+ readSuccess.getAndIncrement();
return JsonUtil.readValue(nodeData, Node.class);
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get coordinator leader", e);
throw new StoreException(e);
}
@@ -252,7 +282,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
try {
byte[] coordinatorBytes = JsonUtil.writeValueAsBytes(coordinator);
client.setData().forPath(coordinatorRoot, coordinatorBytes);
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when set coordinator leader to " + coordinator, e);
throw new StoreException(e);
}
@@ -260,6 +292,8 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void saveSourceCheckpoint(String cubeName, String segmentName, int rsID, String sourceCheckpoint) {
+ checkPath(cubeName, segmentName);
+ logger.trace("Save remote checkpoint {} {} {} with content {}.", cubeName, segmentName, rsID, sourceCheckpoint);
try {
String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_SRC_CHECKPOINT, segmentName,
String.valueOf(rsID));
@@ -269,7 +303,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
logger.warn("Checkpoint path already existed under path {}, overwrite with new one.", path);
}
client.setData().forPath(path, Bytes.toBytes(sourceCheckpoint));
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when save remote checkpoint for " + cubeName + " " + segmentName , e);
throw new StoreException(e);
}
@@ -293,8 +329,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
String sourcePos = Bytes.toString(checkpointBytes);
result.put(Integer.valueOf(child), sourcePos);
}
+ readSuccess.getAndIncrement();
return result;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error to fetch remote checkpoint for " + cubeName + " " + segmentName, e);
throw new StoreException(e);
}
@@ -314,9 +352,10 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
if (replicaSetData != null && replicaSetData.length > 0) {
result = JsonUtil.readValue(Bytes.toString(replicaSetData), ReplicaSet.class);
}
-
+ readSuccess.getAndIncrement();
return result;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get replica set " + rsID, e);
throw new StoreException(e);
}
@@ -326,7 +365,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
public void removeReplicaSet(int rsID) {
try {
client.delete().forPath(ZKPaths.makePath(replicaSetRoot, String.valueOf(rsID)));
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when remove replica set " + rsID, e);
throw new StoreException(e);
}
@@ -341,7 +382,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
Node node = Node.from(receiverName.replace('_', ':'));
result.add(node);
}
+ readSuccess.getAndIncrement();
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when fetch receivers", e);
throw new StoreException(e);
}
@@ -351,8 +394,11 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public List<String> getCubes() {
try {
- return client.getChildren().forPath(cubeRoot);
+ List<String> res = client.getChildren().forPath(cubeRoot);
+ readSuccess.getAndIncrement();
+ return res;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when fetch cubes", e);
throw new StoreException(e);
}
@@ -360,12 +406,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void addStreamingCube(String cube) {
+ checkPath(cube);
try {
String path = ZKPaths.makePath(cubeRoot, cube);
if (client.checkExists().forPath(path) == null) {
client.create().creatingParentsIfNeeded().forPath(path);
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when add cube " + cube, e);
throw new StoreException(e);
}
@@ -373,12 +422,16 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void removeStreamingCube(String cube) {
+ logger.trace("Remove cube {}", cube);
+ checkPath(cube);
try {
String path = ZKPaths.makePath(cubeRoot, cube);
if (client.checkExists().forPath(path) != null) {
client.delete().deletingChildrenIfNeeded().forPath(ZKPaths.makePath(cubeRoot, cube));
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when remove cube " + cube, e);
throw new StoreException(e);
}
@@ -390,6 +443,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
String path = getCubeConsumeStatePath(cube);
if (client.checkExists().forPath(path) != null) {
byte[] cubeInfoData = client.getData().forPath(path);
+ readSuccess.getAndIncrement();
if (cubeInfoData != null && cubeInfoData.length > 0) {
return JsonUtil.readValue(cubeInfoData, StreamingCubeConsumeState.class);
} else {
@@ -399,6 +453,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
return StreamingCubeConsumeState.RUNNING;
}
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Error when get streaming cube consume state " + cube, e);
throw new StoreException(e);
}
@@ -406,6 +461,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void saveStreamingCubeConsumeState(String cube, StreamingCubeConsumeState state) {
+ checkPath(cube);
try {
String path = getCubeConsumeStatePath(cube);
if (client.checkExists().forPath(path) != null) {
@@ -413,7 +469,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
} else {
client.create().creatingParentsIfNeeded().forPath(path, JsonUtil.writeValueAsBytes(state));
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when save streaming cube consume state " + cube + " with " + state, e);
throw new StoreException(e);
}
@@ -421,12 +479,17 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void addReceiver(Node receiver) {
+ logger.trace("Add {}.", receiver);
try {
String receiverPath = ZKPaths.makePath(receiverRoot, receiver.toNormalizeString());
if (client.checkExists().forPath(receiverPath) == null) {
client.create().creatingParentsIfNeeded().forPath(receiverPath);
+ } else {
+ logger.warn("{} exists.", receiverPath);
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when add new receiver " + receiver, e);
throw new StoreException(e);
}
@@ -434,12 +497,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void removeReceiver(Node receiver) {
+ logger.trace("Remove {}.", receiver);
try {
String receiverPath = ZKPaths.makePath(receiverRoot, receiver.toNormalizeString());
if (client.checkExists().forPath(receiverPath) != null) {
client.delete().deletingChildrenIfNeeded().forPath(receiverPath);
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Error when remove receiver " + receiver, e);
throw new StoreException(e);
}
@@ -447,7 +513,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void saveNewCubeAssignment(CubeAssignment newCubeAssignment) {
- logger.info("Try saving new cube assignment for: {}.", newCubeAssignment);
+ logger.trace("Try saving new cube assignment for: {}.", newCubeAssignment);
try {
String path = getCubeAssignmentPath(newCubeAssignment.getCubeName());
if (client.checkExists().forPath(path) == null) {
@@ -456,7 +522,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
} else {
client.setData().forPath(path, CubeAssignment.serializeCubeAssignment(newCubeAssignment));
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Fail to save cube assignment", e);
throw new StoreException(e);
}
@@ -472,6 +540,8 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void addCompleteReplicaSetForSegmentBuild(String cubeName, String segmentName, int rsID) {
+ logger.trace("Add completed rs {} to {} {}", rsID, cubeName, segmentName);
+ checkPath(cubeName, segmentName);
try {
String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName, "replica_sets",
String.valueOf(rsID));
@@ -480,7 +550,9 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
} else {
logger.warn("ReplicaSet id {} existed under path {}", rsID, path);
}
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Fail to add replicaSet Id to segment build state for " + segmentName + " " + rsID, e);
throw new StoreException(e);
}
@@ -488,11 +560,15 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public void updateSegmentBuildState(String cubeName, String segmentName, SegmentBuildState.BuildState state) {
+ logger.trace("Update {} {} to state {}", cubeName, segmentName, state);
+ checkPath(cubeName, segmentName);
try {
String stateStr = JsonUtil.writeValueAsString(state);
String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName);
client.setData().forPath(path, Bytes.toBytes(stateStr));
+ writeSuccess.getAndIncrement();
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Fail to update segment build state for " + segmentName + " to " + state, e);
throw new StoreException(e);
}
@@ -506,6 +582,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
return Lists.newArrayList();
}
List<String> segments = client.getChildren().forPath(cubePath);
+ readSuccess.getAndIncrement();
List<SegmentBuildState> result = Lists.newArrayList();
for (String segment : segments) {
SegmentBuildState segmentState = doGetSegmentBuildState(cubePath, segment);
@@ -513,6 +590,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
}
return result;
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Fail to get segment build states " + cubeName, e);
throw new StoreException(e);
}
@@ -524,6 +602,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
String cubePath = getCubeBuildStatePath(cubeName);
return doGetSegmentBuildState(cubePath, segmentName);
} catch (Exception e) {
+ readFail.getAndIncrement();
logger.error("Fail to get segment build state for " + cubeName + " " +segmentName, e);
throw new StoreException(e);
}
@@ -533,6 +612,7 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
SegmentBuildState segmentState = new SegmentBuildState(segmentName);
String segmentPath = ZKPaths.makePath(cubePath, segmentName);
byte[] stateBytes = client.getData().forPath(segmentPath);
+ readSuccess.getAndIncrement();
SegmentBuildState.BuildState state;
if (stateBytes != null && stateBytes.length > 0) {
String stateStr = Bytes.toString(stateBytes);
@@ -549,16 +629,20 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
@Override
public boolean removeSegmentBuildState(String cubeName, String segmentName) {
+ logger.trace("Remove {} {}", cubeName, segmentName);
+ checkPath(cubeName, segmentName);
try {
String path = ZKPaths.makePath(cubeRoot, cubeName, CUBE_BUILD_STATE, segmentName);
if (client.checkExists().forPath(path) != null) {
client.delete().deletingChildrenIfNeeded().forPath(path);
+ writeSuccess.getAndIncrement();
return true;
} else {
logger.warn("Cube segment deep store state does not exisit!, path {} ", path);
return false;
}
} catch (Exception e) {
+ writeFail.getAndIncrement();
logger.error("Fail to remove cube segment deep store state " + cubeName + " " + segmentName, e);
throw new StoreException(e);
}
@@ -575,4 +659,30 @@ public class ZookeeperStreamMetadataStore implements StreamMetadataStore {
private String getCubeConsumeStatePath(String cubeName) {
return ZKPaths.makePath(cubeRoot, cubeName, CUBE_CONSUME_STATE);
}
+
+ String reportTemplate = "[StreamMetadataStoreStats] read : {} ; write: {} ; read failed: {} ; write failed: {} .";
+ private AtomicLong lastReport = new AtomicLong();
+ private static final long REPORT_DURATION = 300L * 1000;
+
+ @Override
+ public void reportStat() {
+ if (writeFail.get() > 0 || readFail.get() > 0) {
+ logger.warn(reportTemplate, readSuccess.get(), writeSuccess.get(), readFail.get(), writeFail.get());
+ } else {
+ if (System.currentTimeMillis() - lastReport.get() >= REPORT_DURATION) {
+ logger.debug(reportTemplate, readSuccess.get(), writeSuccess.get(), readFail.get(), writeFail.get());
+ } else {
+ return;
+ }
+ }
+ lastReport.set(System.currentTimeMillis());
+ }
+
+ private void checkPath(String... paths){
+ for (String path : paths){
+ if (path == null || path.length() == 0) {
+ throw new IllegalArgumentException("Illegal zookeeper path.");
+ }
+ }
+ }
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
index 8cf3e60..f37469d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/AssignmentsCache.java
@@ -18,10 +18,13 @@
package org.apache.kylin.stream.coordinator.assign;
-import java.io.IOException;
import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalNotification;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.metadata.cachesync.Broadcaster;
import org.apache.kylin.metadata.cachesync.Broadcaster.Event;
@@ -30,21 +33,32 @@ import org.apache.kylin.stream.coordinator.StreamMetadataStore;
import org.apache.kylin.stream.coordinator.StreamMetadataStoreFactory;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.ReplicaSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+/**
+ * Introduce AssignmentsCache to reduce pressure of
+ * @see StreamMetadataStore .
+ */
public class AssignmentsCache {
- private static volatile AssignmentsCache instance = new AssignmentsCache();
+ private static final Logger logger = LoggerFactory.getLogger(AssignmentsCache.class);
+
+ private static final AssignmentsCache instance = new AssignmentsCache();
private static final String ASSIGNMENT_ENTITY = "cube_assign";
private StreamMetadataStore metadataStore;
- private ConcurrentMap<String, List<ReplicaSet>> cubeAssignmentCache;
+ private Cache<String, List<ReplicaSet>> cubeAssignmentCache;
private AssignmentsCache() {
this.metadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
KylinConfig config = KylinConfig.getInstanceFromEnv();
- cubeAssignmentCache = Maps.newConcurrentMap();
- Broadcaster.getInstance(config).registerListener(new AssignCacheSyncListener(), ASSIGNMENT_ENTITY);
+ cubeAssignmentCache = CacheBuilder.newBuilder()
+ .removalListener((RemovalNotification<String, List<ReplicaSet>> notification) -> logger
+ .debug("{} is removed because {} ", notification.getKey(), notification.getCause()))
+ .expireAfterWrite(300, TimeUnit.SECONDS)
+ .build();
+ Broadcaster.getInstance(config).registerListener(new AssignCacheSyncListener(), ASSIGNMENT_ENTITY);
}
public static AssignmentsCache getInstance() {
@@ -52,32 +66,38 @@ public class AssignmentsCache {
}
public List<ReplicaSet> getReplicaSetsByCube(String cubeName) {
- if (cubeAssignmentCache.get(cubeName) == null) {
- synchronized (cubeAssignmentCache) {
- if (cubeAssignmentCache.get(cubeName) == null) {
- List<ReplicaSet> result = Lists.newArrayList();
-
- CubeAssignment assignment = metadataStore.getAssignmentsByCube(cubeName);
- for (Integer replicaSetID : assignment.getReplicaSetIDs()) {
- result.add(metadataStore.getReplicaSet(replicaSetID));
- }
- cubeAssignmentCache.put(cubeName, result);
+ List<ReplicaSet> ret ;
+ try {
+ ret = cubeAssignmentCache.get(cubeName, () -> {
+ List<ReplicaSet> result = Lists.newArrayList();
+ CubeAssignment assignment = metadataStore.getAssignmentsByCube(cubeName);
+ if(assignment == null){
+ logger.error("Inconsistent metadata for assignment of {}, do check it.", cubeName);
+ return result;
+ }
+ for (Integer replicaSetID : assignment.getReplicaSetIDs()) {
+ result.add(metadataStore.getReplicaSet(replicaSetID));
}
- }
+ logger.trace("Update assignment with {}", result);
+ return result;
+ });
+ } catch (ExecutionException e){
+ logger.warn("Failed to load CubeAssignment", e);
+ throw new IllegalStateException("Failed to load CubeAssignment", e);
}
- return cubeAssignmentCache.get(cubeName);
+ return ret;
}
public void clearCubeCache(String cubeName) {
Broadcaster.getInstance(KylinConfig.getInstanceFromEnv()).announce(ASSIGNMENT_ENTITY,
Broadcaster.Event.UPDATE.getType(), cubeName);
- cubeAssignmentCache.remove(cubeName);
+ cubeAssignmentCache.invalidate(cubeName);
}
private class AssignCacheSyncListener extends Listener {
- public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey)
- throws IOException {
- cubeAssignmentCache.remove(cacheKey);
+ @Override
+ public void onEntityChange(Broadcaster broadcaster, String entity, Event event, String cacheKey) {
+ cubeAssignmentCache.invalidate(cacheKey);
}
}
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
index 02bcc09..623fd05 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/assign/DefaultAssigner.java
@@ -33,12 +33,16 @@ import org.apache.kylin.stream.core.model.ReplicaSet;
import org.apache.kylin.stream.core.source.Partition;
import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Default implementation for Assigner, assign according to the consumer task number for each cube
*
*/
public class DefaultAssigner implements Assigner {
+ private static final Logger logger = LoggerFactory.getLogger(DefaultAssigner.class);
+
@Override
public Map<Integer, Map<String, List<Partition>>> reBalancePlan(List<ReplicaSet> replicaSets,
List<StreamingCubeInfo> cubes, List<CubeAssignment> existingAssignments) {
@@ -191,6 +195,11 @@ public class DefaultAssigner implements Assigner {
private List<List<Partition>> splitCubeConsumeTasks(StreamingCubeInfo cube, int replicaSetNum) {
List<Partition> partitionsOfCube = cube.getStreamingTableSourceInfo().getPartitions();
int cubeConsumerTaskNum = getCubeConsumerTasks(cube, replicaSetNum);
+ int partitionsPerReceiver = partitionsOfCube.size()/cubeConsumerTaskNum ;
+ if (partitionsPerReceiver > 3 && replicaSetNum > cubeConsumerTaskNum) {
+ logger.info(
+ "You may consider improve `kylin.stream.cube-num-of-consumer-tasks` because you still some quta left.");
+ }
List<List<Partition>> result = Lists.newArrayListWithCapacity(cubeConsumerTaskNum);
for (int i = 0; i < cubeConsumerTaskNum; i++) {
result.add(Lists.<Partition>newArrayList());
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
index 6f5bb0d..b0e3e92 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitter.java
@@ -17,10 +17,10 @@
*/
package org.apache.kylin.stream.coordinator.coordinate;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
@@ -36,6 +36,7 @@ import org.apache.kylin.metadata.model.Segments;
import org.apache.kylin.stream.coordinator.StreamingCubeInfo;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NonSideEffect;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.model.CubeAssignment;
import org.apache.kylin.stream.core.model.SegmentBuildState;
import org.slf4j.Logger;
@@ -44,6 +45,7 @@ import org.slf4j.LoggerFactory;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListSet;
@@ -106,6 +108,7 @@ public class BuildJobSubmitter implements Runnable {
buildInfos = previousValue;
}
}
+ logger.trace("Add job {} of segment [{} - {}] to track.", segmentBuildJob.jobID, segmentBuildJob.cubeName, segmentBuildJob.segmentName);
boolean addSucceed = buildInfos.add(segmentBuildJob);
if (!addSucceed) {
logger.debug("Add {} failed because we have a duplicated one.", segmentBuildJob);
@@ -136,12 +139,15 @@ public class BuildJobSubmitter implements Runnable {
void doRun() {
checkTimes++;
- logger.info("\n----------------------------------------------------------- {}", checkTimes);
+ logger.debug("\n========================================================================= {}", checkTimes);
+ dumpSegmentBuildJobCheckList();
+ coordinator.getStreamMetadataStore().reportStat();
List<SegmentJobBuildInfo> successJobs = traceEarliestSegmentBuildJob();
for (SegmentJobBuildInfo successJob : successJobs) {
ConcurrentSkipListSet<SegmentJobBuildInfo> submittedBuildJobs = segmentBuildJobCheckList
.get(successJob.cubeName);
+ logger.trace("Remove job {} from check list.", successJob.jobID);
submittedBuildJobs.remove(successJob);
}
@@ -170,8 +176,11 @@ public class BuildJobSubmitter implements Runnable {
@NonSideEffect
List<SegmentJobBuildInfo> traceEarliestSegmentBuildJob() {
List<SegmentJobBuildInfo> successJobs = Lists.newArrayList();
- for (ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos : segmentBuildJobCheckList.values()) {
+ for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> entry :
+ segmentBuildJobCheckList.entrySet()) {
+ ConcurrentSkipListSet<SegmentJobBuildInfo> buildInfos = entry.getValue();
if (buildInfos.isEmpty()) {
+ logger.trace("Skip {}", entry.getKey());
continue;
}
@@ -186,6 +195,7 @@ public class BuildJobSubmitter implements Runnable {
continue;
}
ExecutableState jobState = cubingJob.getStatus();
+ logger.debug("Current job state {}", jobState);
if (ExecutableState.SUCCEED.equals(jobState)) {
CubeManager cubeManager = coordinator.getCubeManager();
CubeInstance cubeInstance = cubeManager.getCube(segmentBuildJob.cubeName).latestCopyForWrite();
@@ -205,11 +215,10 @@ public class BuildJobSubmitter implements Runnable {
logger.warn("Job:{} is error, exceed max retry. Kylin admin could resume it or discard it"
+ "(to let new building job be sumbitted) .", segmentBuildJob);
}
- } else {
- logger.debug("Current job state {}", jobState);
}
- } catch (Exception e) {
- logger.error("Error when check streaming segment job build state:" + segmentBuildJob, e);
+ } catch (StoreException storeEx) {
+ logger.error("Error when check streaming segment job build state:" + segmentBuildJob, storeEx);
+ throw storeEx;
}
}
return successJobs;
@@ -226,7 +235,7 @@ public class BuildJobSubmitter implements Runnable {
boolean ok = submitSegmentBuildJob(cubeName, segmentName);
allSubmited = allSubmited && ok;
if (!ok) {
- logger.debug("Failed to submit building job.");
+ logger.debug("Failed to submit building job for {}.", segmentName);
}
}
if (allSubmited) {
@@ -267,15 +276,16 @@ public class BuildJobSubmitter implements Runnable {
List<SegmentBuildState> segmentStates = coordinator.getStreamMetadataStore().getSegmentBuildStates(cubeName);
int inBuildingSegments = cubeInstance.getBuildingSegments().size();
int leftQuota = allowMaxBuildingSegments - inBuildingSegments;
+ boolean stillQuotaForNewSegment = true;
// Sort it so we can iterate segments from eariler one to newer one
Collections.sort(segmentStates);
for (int i = 0; i < segmentStates.size(); i++) {
-
+ boolean needRebuild = false;
if (leftQuota <= 0) {
- logger.info("No left quota to build segments for cube:{}", cubeName);
- break;
+ logger.info("No left quota to build segments for cube:{} at {}", cubeName, leftQuota);
+ stillQuotaForNewSegment = false;
}
SegmentBuildState segmentState = segmentStates.get(i);
@@ -293,26 +303,24 @@ public class BuildJobSubmitter implements Runnable {
// We already have a building job for current segment
if (segmentState.isInBuilding()) {
- boolean needRebuild = checkSegmentBuildingJob(segmentState, cubeName, cubeInstance);
+ needRebuild = checkSegmentBuildingJob(segmentState, cubeName, cubeInstance);
if (!needRebuild)
continue;
} else if (segmentState.isInWaiting()) {
- // The data has not been uploaded to remote completely, or job is discard
+ // The data maybe uploaded to remote completely, or job is discard
// These two case should be submit a building job, just let go through it
}
boolean readyToBuild = checkSegmentIsReadyToBuild(segmentStates, i, cubeAssignedReplicaSets);
if (!readyToBuild) {
logger.debug("Segment {} {} is not ready to submit a building job.", cubeName, segmentState);
- // use break instead continue here, because we should transfer to next queue in sequential way (no jump the queue)
- break;
- } else {
+ } else if (stillQuotaForNewSegment || needRebuild) {
result.add(segmentState.getSegmentName());
leftQuota--;
}
}
- if (logger.isDebugEnabled() && result.isEmpty()) {
- logger.debug("Candidate {} : {}.", cubeName, String.join(", ", result));
+ if (logger.isDebugEnabled() && !result.isEmpty()) {
+ logger.debug("{} Candidate segment list to be built : {}.", cubeName, String.join(", ", result));
}
return result;
}
@@ -364,13 +372,14 @@ public class BuildJobSubmitter implements Runnable {
}
}
- if (!segmentExists) {
- logger.debug("Create segment for {} {} .", cubeName, segmentName);
- newSeg = coordinator.getCubeManager().appendSegment(cubeInstance,
- new SegmentRange.TSRange(segmentRange.getFirst(), segmentRange.getSecond()));
- } else {
- logger.info("Segment {} exists.", segmentName);
+ if (segmentExists) {
+ logger.warn("Segment {} exists, it will be forced deleted.", segmentName);
+ coordinator.getCubeManager().updateCubeDropSegments(cubeInstance, newSeg);
}
+
+ logger.debug("Create segment for {} {} .", cubeName, segmentName);
+ newSeg = coordinator.getCubeManager().appendSegment(cubeInstance,
+ new SegmentRange.TSRange(segmentRange.getFirst(), segmentRange.getSecond()));
// Step 2. create and submit new build job
DefaultChainedExecutable executable = getStreamingCubingJob(newSeg);
@@ -414,7 +423,11 @@ public class BuildJobSubmitter implements Runnable {
return false;
}
CubingJob cubingJob = (CubingJob) coordinator.getExecutableManager().getJob(jobId);
- Preconditions.checkNotNull(cubingJob, "CubingJob should not be null.");
+ if (cubingJob == null) {
+ // Cubing job is dropped manually, or metadata is broken.
+ logger.warn("Looks like cubing job is dropped manually, it will be submitted a new one.");
+ return true;
+ }
ExecutableState jobState = cubingJob.getStatus();
// If job is already succeed and HBase segment in ready state, remove the build state
@@ -438,8 +451,13 @@ public class BuildJobSubmitter implements Runnable {
// If a job is discard, we will try to resumbit it later.
if (ExecutableState.DISCARDED.equals(jobState)) {
- logger.info("Job:{} is discard, resubmit it later.", jobId);
- return true;
+ if (KylinConfig.getInstanceFromEnv().isAutoResubmitDiscardJob()) {
+ logger.debug("Job:{} is discard, resubmit it later.", jobId);
+ return true;
+ } else {
+ logger.debug("Job:{} is discard, please resubmit yourself.", jobId);
+ return false;
+ }
} else {
logger.info("Job:{} is in running, job state: {}.", jobId, jobState);
}
@@ -484,7 +502,12 @@ public class BuildJobSubmitter implements Runnable {
}
}
}
- return notCompleteReplicaSets.isEmpty();
+ if (notCompleteReplicaSets.isEmpty()) {
+ return true;
+ } else {
+ logger.debug("Not ready data for replica sets: {}", notCompleteReplicaSets);
+ return false;
+ }
}
}
@@ -495,4 +518,16 @@ public class BuildJobSubmitter implements Runnable {
public DefaultChainedExecutable getStreamingCubingJob(CubeSegment segment){
return new StreamingCubingEngine().createStreamingCubingJob(segment, "SYSTEM");
}
-}
+
+ void dumpSegmentBuildJobCheckList() {
+ if (!logger.isTraceEnabled())
+ return;
+ StringBuilder sb = new StringBuilder("Dump JobCheckList:\t");
+ for (Map.Entry<String, ConcurrentSkipListSet<SegmentJobBuildInfo>> cube : segmentBuildJobCheckList.entrySet()) {
+ sb.append(cube.getKey()).append(":").append(cube.getValue());
+ }
+ if (logger.isTraceEnabled()) {
+ logger.trace(sb.toString());
+ }
+ }
+}
\ No newline at end of file
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
index 81b2e8c..32e5b88 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/ReceiverClusterManager.java
@@ -36,6 +36,7 @@ import org.apache.kylin.stream.coordinator.assign.AssignmentsCache;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicIdempotent;
import org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent;
import org.apache.kylin.stream.coordinator.exception.ClusterStateException;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.apache.kylin.stream.core.consumer.ConsumerStartProtocol;
import org.apache.kylin.stream.core.model.AssignRequest;
import org.apache.kylin.stream.core.model.ConsumerStatsResponse;
@@ -72,7 +73,7 @@ import java.util.stream.Collectors;
*
* In a multi-step transcation, following steps should be thought twice:
* 1. should fail fast or continue when exception thrown.
- * 2. should API(remote call) be synchronous or asynchronous
+ * 2. should API(RPC) be synchronous or asynchronous
* 3. when transcation failed, will roll back always succeed
* 4. transcation should be idempotent so when it failed, it could be fixed by retry
* </pre>
@@ -151,10 +152,10 @@ public class ReceiverClusterManager {
/**
* <pre>
- * Reassign action is a process which move some consumption task from some replica set to new replica set.
+ * Reassign action is a process which move some consumption task from some replica set to some new replica sets.
*
* It is necessary in some case such as :
- * - new topic partition was added
+ * - new topic partition was added, or receiver can not catch produce rate, so we need scale out
* - wordload not balance between different replica set
* - some nodes have to be offlined so the consumption task have be transfered
* </pre>
@@ -470,15 +471,15 @@ public class ReceiverClusterManager {
*
* @param partitions specific topic partitions which replica set should consume
* @param startConsumer should receiver start consumption at once
- * @param mustAllSucceed if set to true, we should ensure all receivers has been correctly notified; false
+ * @param failFast if set to true, we should ensure all receivers has been correctly notified(fail-fast); false
* for ensure at least one receivers has been correctly notified
*
* @throws IOException throwed when assign failed
*/
@NotAtomicIdempotent
void assignCubeToReplicaSet(ReplicaSet rs, String cubeName, List<Partition> partitions, boolean startConsumer,
- boolean mustAllSucceed) throws IOException {
- boolean hasNodeAssigned = false;
+ boolean failFast) throws IOException {
+ boolean atLeastOneAssigned = false;
IOException exception = null;
AssignRequest assignRequest = new AssignRequest();
assignRequest.setCubeName(cubeName);
@@ -487,16 +488,16 @@ public class ReceiverClusterManager {
for (final Node node : rs.getNodes()) {
try {
getCoordinator().assignToReceiver(node, assignRequest);
- hasNodeAssigned = true;
+ atLeastOneAssigned = true;
} catch (IOException e) {
- if (mustAllSucceed) {
+ if (failFast) {
throw e;
}
exception = e;
- logger.error("cube:" + cubeName + " consumers start fail for node:" + node.toString(), e);
+ logger.error("Cube:" + cubeName + " consumers start fail for node:" + node.toString(), e);
}
}
- if (!hasNodeAssigned) {
+ if (!atLeastOneAssigned) {
if (exception != null) {
throw exception;
}
@@ -506,11 +507,12 @@ public class ReceiverClusterManager {
/**
* When a segment build job succeed, we should do some following job to deliver it to historical part.
*
+ * @throws org.apache.kylin.stream.coordinator.exception.StoreException thrown when write metadata failed
* @return true if promote succeed, else false
*/
@NotAtomicIdempotent
protected boolean segmentBuildComplete(CubingJob cubingJob, CubeInstance cubeInstance, CubeSegment cubeSegment,
- SegmentJobBuildInfo segmentBuildInfo) throws IOException {
+ SegmentJobBuildInfo segmentBuildInfo) {
String cubeName = segmentBuildInfo.cubeName;
String segmentName = segmentBuildInfo.segmentName;
@@ -521,8 +523,14 @@ public class ReceiverClusterManager {
}
if (!SegmentStatusEnum.READY.equals(cubeSegment.getStatus())) {
- promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
+ try {
+ promoteNewSegment(cubingJob, cubeInstance, cubeSegment);
+ } catch (IOException storeException) {
+ throw new StoreException("Promote failed because of metadata store.", storeException);
+ }
logger.debug("Promote {} succeed.", segmentName);
+ } else {
+ logger.debug("Segment status is: {}", cubeSegment.getStatus());
}
// Step 2. delete local segment files in receiver side because these are useless now
@@ -533,8 +541,9 @@ public class ReceiverClusterManager {
ReplicaSet rs = getCoordinator().getStreamMetadataStore().getReplicaSet(replicaSetID);
for (Node node : rs.getNodes()) {
try {
- getCoordinator().notifyReceiverBuildSuccess(node, cubeName, segmentName);
+ getCoordinator().notifyReceiverBuildSuccess(node, cubeName, segmentName); // Idempotent
} catch (IOException e) {
+ // It is OK to just print log, unused segment cache in receiver will be deleted in next call
logger.error("error when remove cube segment for receiver:" + node, e);
}
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
index 7c3c575..f234c2e 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/coordinate/StreamingCoordinator.java
@@ -113,7 +113,7 @@ public class StreamingCoordinator implements CoordinatorClient {
private StreamingCoordinator() {
this.streamMetadataStore = StreamMetadataStoreFactory.getStreamMetaDataStore();
- clusterManager = new ReceiverClusterManager(this);
+ this.clusterManager = new ReceiverClusterManager(this);
this.receiverAdminClient = new HttpReceiverAdminClient();
this.assigner = getAssigner();
this.zkClient = StreamingUtils.getZookeeperClient();
@@ -195,7 +195,7 @@ public class StreamingCoordinator implements CoordinatorClient {
for (Node receiver : rs.getNodes()) {
try {
unAssignToReceiver(receiver, request);
- } catch (Exception e) {
+ } catch (IOException e) {
logger.error("Exception throws when unAssign receiver", e);
unAssignedFailReceivers.add(receiver);
}
@@ -297,6 +297,7 @@ public class StreamingCoordinator implements CoordinatorClient {
}
}
+ @NotAtomicIdempotent
private void doAssignCube(String cubeName, CubeAssignment assignment) {
Set<ReplicaSet> successRS = Sets.newHashSet();
try {
@@ -306,11 +307,11 @@ public class StreamingCoordinator implements CoordinatorClient {
false);
successRS.add(rs);
}
- logger.debug("Committing assign {} transaction.", cubeName);
+ logger.debug("Committing assignment {} transaction.", cubeName);
streamMetadataStore.saveNewCubeAssignment(assignment);
- logger.debug("Committed assign {} transaction.", cubeName);
+ logger.debug("Committed assignment {} transaction.", cubeName);
} catch (Exception e) {
- // roll back the success group assignment
+ logger.debug("Starting roll back success receivers.");
for (ReplicaSet rs : successRS) {
UnAssignRequest request = new UnAssignRequest();
@@ -350,6 +351,7 @@ public class StreamingCoordinator implements CoordinatorClient {
int replicaSetID = streamMetadataStore.createReplicaSet(rs);
try {
for (Node receiver : rs.getNodes()) {
+ logger.trace("Notify {} that it has been added to {} .", receiver, replicaSetID);
addReceiverToReplicaSet(receiver, replicaSetID);
}
} catch (IOException e) {
@@ -379,7 +381,7 @@ public class StreamingCoordinator implements CoordinatorClient {
for (ReplicaSet other : allReplicaSet) {
if (other.getReplicaSetID() != replicaSetID) {
if (other.getNodes().contains(receiver)) {
- logger.error("error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
+ logger.error("Error add Node {} to replicaSet {}, already exist in replicaSet {} ", nodeID,
replicaSetID, other.getReplicaSetID());
throw new IllegalStateException("Node exists in ReplicaSet!");
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
index 2c4fce2..5177f78 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/doctor/ClusterStateChecker.java
@@ -18,17 +18,65 @@
package org.apache.kylin.stream.coordinator.doctor;
+import org.apache.hadoop.hbase.util.Threads;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
/**
- * <pre>
- * Basic step of this class:
- * 1. stop coordinator to avoid underlying concurrency issue
- * 2. check inconsistent state of all receiver cluster
- * 3. send summary via mail to kylin admin
- * 4. if need, call ClusterDoctor to repair inconsistent issue
- * </pre>
+ *
+ * <h2>Something need to check</h2>
+ * <dl>
+ * <dt>Zookeeper Avaliable</dt>
+ * <dd>Check Avaliable</dd>
+ *
+ * <dt>Receiver Avaliable</dt>
+ * <dd>Check Avaliable</dd>
+ *
+ * <dt>Metadata RW failure</dt>
+ * <dd>Coordinator write consistent state into metadata, statistics of r/w failure
+ * is important of cluster health</dd>
+ *
+ * <dt>RPC failure</dt>
+ * <dd>Coordinator send request to streaming receiver, statistics
+ * of failure is important of cluster health</dd>
+ *
+ * <dt>Segment Build Job & Promotion Failure</dt>
+ *
+ *
+ * <dt>Cube Assignment Consistency</dt>
+ * <dd>If receiver's behvaior is not aligned with central metdadata, it indicated there must be something wrong.</dd>
+ *
+ * <dt>Active segments Count & Immutable segments Count</dt>
+ * <dd>If receivers have too many active segments, it indicated that promotion is blocked,
+ * for each query it received, it has to scan too much segment/fragment file,
+ * so performance will be impacted badly. </dd>
+ *
+ * <dt>Consume lag</dt>
+ * <dd>If receivers cannot catch the rate by producer, much active will be accumulated,
+ * and performance will be impacted badly. </dd>
+ *
+ * </dl>
+ *
+ * <h2>Check and Report</h2>
+ * Basic step:
+ * <ol>
+ * <li> stop coordinator to avoid underlying concurrency issue </li>
+ * <li> check inconsistent state of all receiver cluster </li>
+ * <li> send summary via mail to kylin admin </li>
+ * <li> if need, call ClusterDoctor to repair inconsistent issue </li>
+ * </ol>
+ *
* @see org.apache.kylin.stream.coordinator.coordinate.annotations.NotAtomicAndNotIdempotent
* @see ClusterDoctor
*/
public class ClusterStateChecker {
- // TO BE IMPLEMENTED
+
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(1, 10, 20, TimeUnit.MINUTES,
+ new LinkedBlockingQueue<Runnable>(10 * 100), //
+ Threads.newDaemonThreadFactory("Cluster-checker-"));
+
+
+
+
}
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
index 044534c..fa9d19e 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/BuildJobSubmitterTest.java
@@ -28,6 +28,7 @@ import org.apache.kylin.job.execution.ExecutableState;
import org.apache.kylin.metadata.model.SegmentRange;
import org.apache.kylin.metadata.model.SegmentStatusEnum;
import org.apache.kylin.metadata.model.Segments;
+import org.apache.kylin.stream.coordinator.exception.StoreException;
import org.junit.Test;
import java.io.IOException;
@@ -90,12 +91,12 @@ public class BuildJobSubmitterTest extends StreamingTestBase {
assertEquals(1, buildJobSubmitter.getCubeCheckList().size());
}
- @Test
+ @Test(expected = StoreException.class)
@SuppressWarnings("unchecked")
- public void testTraceEarliestSegmentBuildJob2() throws IOException {
+ public void testTraceEarliestSegmentBuildJob2() {
beforeTestTraceEarliestSegmentBuildJob();
when(clusterManager.segmentBuildComplete(isA(CubingJob.class), isA(CubeInstance.class), isA(CubeSegment.class),
- isA(SegmentJobBuildInfo.class))).thenThrow(IOException.class);
+ isA(SegmentJobBuildInfo.class))).thenThrow(StoreException.class);
BuildJobSubmitter buildJobSubmitter = new BuildJobSubmitter(streamingCoordinator);
buildJobSubmitter.restore();
List<SegmentJobBuildInfo> jobList = buildJobSubmitter.traceEarliestSegmentBuildJob();
diff --git a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
index c7b1ac5..408173a 100644
--- a/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
+++ b/stream-coordinator/src/test/java/org/apache/kylin/stream/coordinator/coordinate/StreamingTestBase.java
@@ -262,7 +262,6 @@ public class StreamingTestBase extends LocalFileMetadataTestCase {
ReceiverClusterManager clusterManager = mock(ReceiverClusterManager.class);
when(clusterManager.getCoordinator()).thenReturn(coordinator);
return clusterManager;
- // return new ReceiverClusterManager(coordinator);
}
KylinConfig stubKylinConfig() {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
index b040691..9007d35 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/client/ReceiverAdminClient.java
@@ -35,7 +35,6 @@ import org.apache.kylin.stream.core.model.stats.ReceiverStats;
/**
* StreamingCoordinator send admin request to speicifc receiver
* (received by org.apache.kylin.stream.server.rest.controller.AdminController).
- *
*/
public interface ReceiverAdminClient {
@@ -103,6 +102,10 @@ public interface ReceiverAdminClient {
/**
* Ask receiver to stop consumption and convert all segments to Immutable.
+ *
+ * If a replica set is removed from consumption task, coordinator will notify
+ * its receivers and ask them to upload all data asap.
+ * Often happend in reassign action.
*/
void makeCubeImmutable(Node receiver, String cubeName) throws IOException;
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
index 256b519..f5d70e3 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/consumer/StreamingConsumerChannel.java
@@ -80,6 +80,7 @@ public class StreamingConsumerChannel implements Runnable {
public void start() {
this.consumerThread = new Thread(this, cubeName + "_channel");
+ consumerThread.setPriority(Thread.MAX_PRIORITY); // Improve the priority of consumer thread to make ingest rate stable
connector.open();
consumerThread.start();
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
index 10e7b82..52658d8 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingDataQueryPlanner.java
@@ -28,20 +28,33 @@ import org.apache.kylin.metadata.model.TblColRef;
import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker;
import org.apache.kylin.stream.core.util.CompareFilterTimeRangeChecker.CheckResult;
import org.apache.kylin.dimension.TimeDerivedColumnType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.TimeZone;
/**
* Scan planner for Streaming data segments, take derived time columns into consideration.
*/
public class StreamingDataQueryPlanner {
+ private static Logger logger = LoggerFactory.getLogger(StreamingDataQueryPlanner.class);
+
protected CubeDesc cubeDesc;
protected TupleFilter filter;
protected TupleFilter flatFilter;
+ private final long timezoneOffset;
public StreamingDataQueryPlanner(CubeDesc cubeDesc, TupleFilter filter) {
this.cubeDesc = cubeDesc;
this.filter = filter;
this.flatFilter = flattenToOrAndFilter(filter);
+ String timezoneName = cubeDesc.getConfig().getStreamingDerivedTimeTimezone();
+ if (timezoneName == null || timezoneName.length() == 0) {
+ timezoneOffset = 0;
+ } else {
+ timezoneOffset = TimeZone.getTimeZone(timezoneName).getRawOffset();
+ }
}
public boolean canSkip(long timeStart, long timeEnd) {
@@ -81,7 +94,7 @@ public class StreamingDataQueryPlanner {
TimeDerivedColumnType timeDerivedColumnType = TimeDerivedColumnType.getTimeDerivedColumnType(column
.getName());
- CheckResult checkResult = timeRangeChecker.check(comp, timeDerivedColumnType);
+ CheckResult checkResult = timeRangeChecker.check(comp, timeDerivedColumnType, timezoneOffset);
if (checkResult == CheckResult.EXCLUDED) {
return true;
} else {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
index 110cc8e..87e4c5f 100755
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/query/StreamingTupleConverter.java
@@ -55,8 +55,8 @@ public class StreamingTupleConverter {
final List<MeasureType.IAdvMeasureFiller> advMeasureFillers;
final List<Integer> advMeasureIndexInGTValues;
- final boolean autoTimezone = KylinConfig.getInstanceFromEnv().isStreamingAutoJustTimezone();
- private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone())
+ final boolean autoTimezone = KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone().length() > 0;
+ private static final long TIME_ZONE_OFFSET = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getStreamingDerivedTimeTimezone())
.getRawOffset();
public StreamingTupleConverter(ResponseResultSchema schema, TupleInfo returnTupleInfo) {
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java b/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
index 3e27d89..5afef78 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/source/IStreamingMessageParser.java
@@ -20,6 +20,9 @@ package org.apache.kylin.stream.core.source;
import org.apache.kylin.stream.core.model.StreamingMessage;
+/**
+ * ExtensionPoint for parse message
+ */
public interface IStreamingMessageParser<T> {
StreamingMessage parse(T sourceMessage);
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
index 9b0a912..c2f7641 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFileSearcher.java
@@ -25,6 +25,7 @@ import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
+import java.util.TimeZone;
import org.apache.kylin.common.util.ByteArray;
import org.apache.kylin.common.util.Dictionary;
@@ -79,6 +80,11 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
@Override
public void search(StreamingSearchContext searchContext, ResultCollector collector) throws IOException {
+ String timezone = searchContext.getCubeDesc().getConfig().getStreamingDerivedTimeTimezone();
+ long timezoneOffset = 0;
+ if (timezone != null && timezone.length() > 0) {
+ timezoneOffset = TimeZone.getTimeZone(timezone).getRawOffset();
+ }
FragmentMetaInfo fragmentMetaInfo = fragmentData.getFragmentMetaInfo();
CuboidMetaInfo cuboidMetaInfo;
if (searchContext.hitBasicCuboid()) {
@@ -116,8 +122,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
Set<TblColRef> unEvaluateDims = Sets.newHashSet();
TupleFilter fragmentFilter = null;
if (searchContext.getFilter() != null) {
- fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec,
- dimensions, new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims);
+ fragmentFilter = convertFilter(fragmentMetaInfo, searchContext.getFilter(), recordCodec, dimensions,
+ new CubeDimEncMap(cubeDesc, dictMap), unEvaluateDims, timezoneOffset);
}
if (ConstantTupleFilter.TRUE == fragmentFilter) {
fragmentFilter = null;
@@ -134,8 +140,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
}
private TupleFilter convertFilter(FragmentMetaInfo fragmentMetaInfo, TupleFilter rootFilter,
- ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
- final Set<TblColRef> unEvaluableColumnCollector) {
+ ColumnarRecordCodec recordCodec, final TblColRef[] dimensions, final IDimensionEncodingMap dimEncodingMap, //
+ final Set<TblColRef> unEvaluableColumnCollector, long timezoneOffset) {
Map<TblColRef, Integer> colMapping = Maps.newHashMap();
for (int i = 0; i < dimensions.length; i++) {
colMapping.put(dimensions[i], i);
@@ -147,6 +153,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
filter = builtInFunctionTransformer.transform(filter);
FragmentFilterConverter fragmentFilterConverter = new FragmentFilterConverter(fragmentMetaInfo, unEvaluableColumnCollector,
colMapping, recordCodec);
+ fragmentFilterConverter.setTimezoneOffset(timezoneOffset);
filter = fragmentFilterConverter.transform(filter);
filter = new FilterOptimizeTransformer().transform(filter);
@@ -159,6 +166,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
private CompareFilterTimeRangeChecker filterTimeRangeChecker;
private ColumnarRecordCodec recordCodec;
transient ByteBuffer buf;
+ private long timezoneOffset = 0;
public FragmentFilterConverter(FragmentMetaInfo fragmentMetaInfo, Set<TblColRef> unEvaluableColumnCollector,
Map<TblColRef, Integer> colMapping, ColumnarRecordCodec recordCodec) {
@@ -225,7 +233,7 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
if (TimeDerivedColumnType.isTimeDerivedColumn(externalCol.getName()) && filterTimeRangeChecker != null) {
CheckResult checkResult = filterTimeRangeChecker.check(oldCompareFilter,
- TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()));
+ TimeDerivedColumnType.getTimeDerivedColumnType(externalCol.getName()), timezoneOffset);
if (checkResult == CheckResult.INCLUDED) {
return ConstantTupleFilter.TRUE;
} else if (checkResult == CheckResult.EXCLUDED) {
@@ -354,7 +362,8 @@ public class FragmentFileSearcher implements IStreamingGTSearcher {
}
}
-
+ public void setTimezoneOffset(long timezoneOffset) {
+ this.timezoneOffset = timezoneOffset;
+ }
}
-
}
\ No newline at end of file
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
index c01459d..52472b2 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/CompareFilterTimeRangeChecker.java
@@ -57,10 +57,17 @@ public class CompareFilterTimeRangeChecker {
this.endClose = endClose;
}
- public CheckResult check(CompareTupleFilter compFilter, TimeDerivedColumnType timeDerivedColumnType) {
+ public CheckResult check(CompareTupleFilter compFilter, TimeDerivedColumnType timeDerivedColumnType, long timezoneOffset) {
Object timestampValue = compFilter.getFirstValue();
Set conditionValues = compFilter.getValues();
- Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+ Pair<Long, Long> timeUnitRange;
+ if (timeDerivedColumnType != TimeDerivedColumnType.MINUTE_START
+ && timeDerivedColumnType != TimeDerivedColumnType.HOUR_START) {
+ timeUnitRange = timezoneOffset == 0 ? timeDerivedColumnType.getTimeUnitRange(timestampValue)
+ : timeDerivedColumnType.getTimeUnitRangeTimezoneAware(timestampValue, timezoneOffset);
+ } else {
+ timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+ }
switch (compFilter.getOperator()) {
case EQ:
return checkForEqValue(timeUnitRange);
@@ -105,7 +112,7 @@ public class CompareFilterTimeRangeChecker {
}
return CheckResult.OVERLAP;
case IN:
- return checkForInValues(timeDerivedColumnType, conditionValues);
+ return checkForInValues(timeDerivedColumnType, conditionValues, timezoneOffset);
default:
return CheckResult.OVERLAP;
}
@@ -121,10 +128,10 @@ public class CompareFilterTimeRangeChecker {
return CheckResult.OVERLAP;
}
- private CheckResult checkForInValues(TimeDerivedColumnType timeDerivedColumnType, Collection<Object> values) {
+ private CheckResult checkForInValues(TimeDerivedColumnType timeDerivedColumnType, Collection<Object> values, long timezoneOffset) {
CheckResult result = null;
for (Object timestampValue : values) {
- Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRange(timestampValue);
+ Pair<Long, Long> timeUnitRange = timeDerivedColumnType.getTimeUnitRangeTimezoneAware(timestampValue, timezoneOffset);
CheckResult checkResult = checkForEqValue(timeUnitRange);
if (result == null) {
result = checkResult;
diff --git a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
index 4a504aa..cb1bd6b 100644
--- a/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
+++ b/stream-source-kafka/src/main/java/org/apache/kylin/stream/source/kafka/TimedJsonStreamParser.java
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
+import java.util.TimeZone;
import java.util.TreeMap;
import org.apache.commons.lang3.StringUtils;
@@ -65,6 +66,7 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
private String tsColName = "timestamp";
private String tsParser = null;
private AbstractTimeParser streamTimeParser;
+ private long timeZoneOffset = 0;
/**
* the path of {"user" : {"name": "kite", "sex":"female"}}
@@ -76,6 +78,9 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
public TimedJsonStreamParser(CubeDesc cubeDesc, MessageParserInfo parserInfo) {
this(new CubeJoinedFlatTableDesc(cubeDesc).getAllColumns(), parserInfo);
+ String timeZone = cubeDesc.getConfig().getStreamingDerivedTimeTimezone();
+ if(timeZone.length() > 0)
+ timeZoneOffset = TimeZone.getTimeZone(timeZone).getRawOffset();
}
public TimedJsonStreamParser(List<TblColRef> cols, MessageParserInfo parserInfo) {
@@ -144,7 +149,11 @@ public final class TimedJsonStreamParser implements IStreamingMessageParser<Cons
String columnName = column.getName();
TimeDerivedColumnType columnType = TimeDerivedColumnType.getTimeDerivedColumnType(columnName);
if (columnType != null) {
- result.add(String.valueOf(columnType.normalize(t)));
+ if (timeZoneOffset > 0 && TimeDerivedColumnType.isTimeDerivedColumnAboveDayLevel(columnName)) {
+ result.add(String.valueOf(columnType.normalize(t + timeZoneOffset)));
+ } else {
+ result.add(String.valueOf(columnType.normalize(t)));
+ }
} else {
Object value = root.get(columnName.toLowerCase(Locale.ROOT));
if (value == null) {