You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2018/12/27 03:06:35 UTC
[kylin] branch realtime-streaming updated: KYLIN-3742 Fix
DataRequest for NPE and add some javadoc
This is an automated email from the ASF dual-hosted git repository.
magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/realtime-streaming by this push:
new c406029 KYLIN-3742 Fix DataRequest for NPE and add some javadoc
c406029 is described below
commit c406029bf6fea7dc78417aa4ff20a13c092e0079
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 26 20:37:22 2018 +0800
KYLIN-3742 Fix DataRequest for NPE and add some javadoc
---
.../apache/kylin/metadata/model/ParameterDesc.java | 14 +++++++--
.../controller/StreamingCoordinatorController.java | 34 ++++++++++++++--------
.../stream/rpc/HttpStreamDataSearchClient.java | 11 ++++---
.../kylin/stream/coordinator/Coordinator.java | 9 ++++--
.../kylin/stream/core/model/DataRequest.java | 7 +++--
.../core/storage/StreamingSegmentManager.java | 2 ++
.../apache/kylin/stream/core/util/RestService.java | 10 +++++++
.../kylin/stream/server/StreamingServer.java | 28 ++++++++++++------
.../server/rest/controller/DataController.java | 3 +-
.../{PolicyInfo.java => RetentionPolicyInfo.java} | 11 ++++++-
10 files changed, 92 insertions(+), 37 deletions(-)
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503..45af397 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -149,8 +149,11 @@ public class ParameterDesc implements Serializable {
if (p.isColumnType()) {
if (q.isColumnType() == false)
return false;
- if (q.getColRef().equals(p.getColRef()) == false)
+ if (q.getColRef() != null && q.getColRef().equals(p.getColRef()) == false)
return false;
+ if (!(q.getType().equals(p.getType()) && q.getValue().equals(q.getValue()))) {
+ return false;
+ }
} else {
if (q.isColumnType() == true)
return false;
@@ -185,8 +188,13 @@ public class ParameterDesc implements Serializable {
@Override
public String toString() {
- String thisStr = isColumnType() ? colRef.toString() : value;
- return nextParameter == null ? thisStr : thisStr + "," + nextParameter.toString();
+ String tmp = null;
+ if (isColumnType() && colRef != null) {
+ tmp = colRef.toString();
+ } else {
+ tmp = value;
+ }
+ return nextParameter == null ? tmp : tmp + "," + nextParameter.toString();
}
/**
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
index bc3886f..afb6a43 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
@@ -100,21 +100,24 @@ public class StreamingCoordinatorController extends BasicController {
}
}
- @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse assignStreamingCube(@PathVariable String cubeName) {
streamingCoordinartorService.assignCube(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse unAssignStreamingCube(@PathVariable String cubeName) {
streamingCoordinartorService.unAssignCube(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST })
+ @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse reAssignStreamingCube(@PathVariable String cubeName,
@RequestBody CubeAssignment newAssignments) {
@@ -122,49 +125,55 @@ public class StreamingCoordinatorController extends BasicController {
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST })
+ @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }, produces = { "application/json" })
@ResponseBody
public CoordinatorResponse createReplicaSet(@RequestBody ReplicaSet rs) {
streamingCoordinartorService.createReplicaSet(rs);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse deleteReplicaSet(@PathVariable Integer replicaSetID) {
streamingCoordinartorService.removeReplicaSet(replicaSetID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse addNodeToReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
streamingCoordinartorService.addNodeToReplicaSet(replicaSetID, nodeID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE })
+ @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }, produces = {
+ "application/json" })
@ResponseBody
- public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
+ public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID,
+ @PathVariable String nodeID) {
streamingCoordinartorService.removeNodeFromReplicaSet(replicaSetID, nodeID);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse pauseCubeConsume(@PathVariable String cubeName) {
streamingCoordinartorService.pauseConsumers(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT })
+ @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse resumeCubeConsume(@PathVariable String cubeName) {
streamingCoordinartorService.resumeConsumers(cubeName);
return new CoordinatorResponse();
}
- @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST })
+ @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST }, produces = { "application/json" })
@ResponseBody
public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody RemoteStoreCompleteRequest request) {
Pair<Long, Long> segmentRange = new Pair<>(request.getSegmentStart(), request.getSegmentEnd());
@@ -176,7 +185,8 @@ public class StreamingCoordinatorController extends BasicController {
return new CoordinatorResponse();
}
- @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST })
+ @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST }, produces = {
+ "application/json" })
@ResponseBody
public CoordinatorResponse replicaSetLeaderChange(@RequestBody ReplicaSetLeaderChangeRequest request) {
logger.info("receive replicaSet leader change:" + request);
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index ec13485..36ae3b3 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -103,8 +103,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo);
final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
- final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime,
- tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+ final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
+ tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
logger.info("Query-{}:send request to stream receivers", query.getQueryId());
for (final ReplicaSet rs : replicaSetsOfCube) {
@@ -173,9 +173,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
return receivers.get((receiverNo + 1) % receiversSize);
}
- public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube,
- StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver,
- TupleInfo tupleInfo) throws Exception {
+ public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
+ RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
String queryId = dataRequest.getQueryId();
logger.info("send query to receiver " + receiver + " with query id:" + queryId);
String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
@@ -235,7 +234,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
}
request.setGroups(groupSet);
- request.setMetrics(metrics);
+ request.setMetrics(Lists.newArrayList(metrics));
return request;
}
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index bb16dd5..f218d9d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -100,9 +100,14 @@ import com.google.common.collect.Sets;
import javax.annotation.Nullable;
/**
- *
- * Each Kylin Streaming cluster has a coordinator to handle generic assignment, membership and streaming cube state management.
+ * <pre>
+ * Each Kylin streaming cluster has at least one coordinator processes/server, coordinator
+ * server works as the master node of streaming cluster and handle generic assignment,
+ * membership and streaming cube state management.
*
+ * When cluster have several coordinator processes, only the leader try to answer coordinator client's
+ * request, others process will become standby/candidate, so single point of failure will be eliminated.
+ * </pre>
*/
public class Coordinator implements CoordinatorClient {
private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index dd8b58a..07c9028 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -18,6 +18,7 @@
package org.apache.kylin.stream.core.model;
+import java.util.List;
import java.util.Set;
import org.apache.kylin.metadata.model.FunctionDesc;
@@ -30,7 +31,7 @@ public class DataRequest {
private String havingFilter;
private Set<String> dimensions; // what contains in Pair is <tableName, columnName>
private Set<String> groups;
- private Set<FunctionDesc> metrics;
+ private List<FunctionDesc> metrics;
private int storagePushDownLimit = Integer.MAX_VALUE;
private boolean allowStorageAggregation;
@@ -78,11 +79,11 @@ public class DataRequest {
this.groups = groups;
}
- public Set<FunctionDesc> getMetrics() {
+ public List<FunctionDesc> getMetrics() {
return metrics;
}
- public void setMetrics(Set<FunctionDesc> metrics) {
+ public void setMetrics(List<FunctionDesc> metrics) {
this.metrics = metrics;
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 0e05eaf..537f5a4 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -149,6 +149,8 @@ public class StreamingSegmentManager implements Closeable {
}
activeSegments.put(segmentStart, segment);
+ // when current active segments exceed tolerance, some unpredictable accident may happend,
+ // but is should be configurable or computed on the fly
if (activeSegments.size() > 12) {
logger.warn("Two many active segments, segments size = " + activeSegments.keySet());
}
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
index 1c75460..50c4ba6 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
@@ -109,6 +109,16 @@ public class RestService {
HttpResponse response = httpClient.execute(request);
String msg = EntityUtils.toString(response.getEntity());
int code = response.getStatusLine().getStatusCode();
+ if (logger.isTraceEnabled()) {
+ String displayMessage;
+ if (msg.length() > 500) {
+ displayMessage = msg.substring(0, 500);
+ } else {
+ displayMessage = msg;
+ }
+ logger.trace("Send request: {}. And receive response[{}] which lenght is {}, and content is {}.", code,
+ request.getRequestLine().toString(), msg.length(), displayMessage);
+ }
if (code != 200)
throw new IOException("Invalid http response " + code + " when send request: "
+ request.getURI().toString() + "\n" + msg);
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 481b746..c171561 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -85,7 +85,7 @@ import org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache;
import org.apache.kylin.stream.core.util.HDFSUtil;
import org.apache.kylin.stream.core.util.NamedThreadFactory;
import org.apache.kylin.stream.core.util.NodeUtil;
-import org.apache.kylin.stream.server.retention.PolicyInfo;
+import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -109,6 +109,9 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
private StreamMetadataStore streamMetadataStore;
private Node currentNode;
private int replicaSetID = -1;
+ /**
+ * indicate whether current receiver is the leader of whole replica set
+ */
private volatile boolean isLeader = false;
private ScheduledExecutorService segmentStateCheckerExecutor;
@@ -155,21 +158,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
CubeInstance cubeInstance = segmentManager.getCubeInstance();
String cubeName = cubeInstance.getName();
try {
- PolicyInfo policyInfo = new PolicyInfo();
+ RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo();
String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
Map<String, String> policyProps = cubeInstance.getConfig()
.getStreamingSegmentRetentionPolicyProperties(policyName);
- policyInfo.setName(policyName);
- policyInfo.setProperties(policyProps);
+ retentionPolicyInfo.setName(policyName);
+ retentionPolicyInfo.setProperties(policyProps);
//The returned segments that require remote persisted are already sorted in ascending order by the segment start time
Collection<StreamingCubeSegment> segments = segmentManager.getRequireRemotePersistSegments();
if (!segments.isEmpty()) {
logger.info("found cube {} segments:{} are immutable, retention policy is: {}", cubeName,
- segments, policyInfo.getName());
+ segments, retentionPolicyInfo.getName());
} else {
continue;
}
- handleImmutableCubeSegments(cubeName, segmentManager, segments, policyInfo);
+ handleImmutableCubeSegments(cubeName, segmentManager, segments, retentionPolicyInfo);
} catch (Exception e) {
logger.error("error when handle cube:" + cubeName, e);
}
@@ -178,14 +181,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
}, 60, 60, TimeUnit.SECONDS);
}
+ /**
+ * <pre>
+ * When segment status was changed to immutable, the leader of replica will
+ * try to upload local segment cache to remote, while the follower will remove
+ * local segment cache.
+ * </pre>
+ */
private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager,
- Collection<StreamingCubeSegment> segments, PolicyInfo policyInfo) throws Exception {
- if (PolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(policyInfo.getName())) {
+ Collection<StreamingCubeSegment> segments, RetentionPolicyInfo retentionPolicyInfo) throws Exception {
+ if (RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName())) {
if (isLeader) {
sendSegmentsToFullBuild(cubeName, segmentManager, segments);
}
} else {
- purgeSegments(cubeName, segments, policyInfo.getProperties());
+ purgeSegments(cubeName, segments, retentionPolicyInfo.getProperties());
}
}
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 2fdc218..45c6307 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -18,6 +18,7 @@
package org.apache.kylin.stream.server.rest.controller;
+import java.util.List;
import java.util.Set;
import org.apache.commons.codec.binary.Base64;
@@ -140,7 +141,7 @@ public class DataController extends BasicController {
}
}
- private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, Set<FunctionDesc> metrics) {
+ private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, List<FunctionDesc> metrics) {
Set<FunctionDesc> result = Sets.newHashSet();
for (FunctionDesc metric : metrics) {
result.add(findAggrFuncFromCubeDesc(cubeDesc, metric));
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
similarity index 87%
rename from stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
rename to stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
index 08b3a6c..0320032 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
@@ -22,8 +22,17 @@ import java.util.Map;
import com.google.common.collect.Maps;
-public class PolicyInfo {
+/**
+ * Retention policy for local segment cache
+ */
+public class RetentionPolicyInfo {
+ /**
+ * outdated data will be dropped
+ */
public static final String PURGE_POLICY = "purge";
+ /**
+ * when data become immutable, it will be presisted remotely
+ */
public static final String FULL_BUILD_POLICY = "fullBuild";
private String name;