You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2018/12/27 03:06:35 UTC

[kylin] branch realtime-streaming updated: KYLIN-3742 Fix DataRequest for NPE and add some javadoc

This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/realtime-streaming by this push:
     new c406029  KYLIN-3742 Fix DataRequest for NPE and add some javadoc
c406029 is described below

commit c406029bf6fea7dc78417aa4ff20a13c092e0079
Author: hit-lacus <hi...@126.com>
AuthorDate: Wed Dec 26 20:37:22 2018 +0800

    KYLIN-3742 Fix DataRequest for NPE and add some javadoc
---
 .../apache/kylin/metadata/model/ParameterDesc.java | 14 +++++++--
 .../controller/StreamingCoordinatorController.java | 34 ++++++++++++++--------
 .../stream/rpc/HttpStreamDataSearchClient.java     | 11 ++++---
 .../kylin/stream/coordinator/Coordinator.java      |  9 ++++--
 .../kylin/stream/core/model/DataRequest.java       |  7 +++--
 .../core/storage/StreamingSegmentManager.java      |  2 ++
 .../apache/kylin/stream/core/util/RestService.java | 10 +++++++
 .../kylin/stream/server/StreamingServer.java       | 28 ++++++++++++------
 .../server/rest/controller/DataController.java     |  3 +-
 .../{PolicyInfo.java => RetentionPolicyInfo.java}  | 11 ++++++-
 10 files changed, 92 insertions(+), 37 deletions(-)

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503..45af397 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -149,8 +149,11 @@ public class ParameterDesc implements Serializable {
             if (p.isColumnType()) {
                 if (q.isColumnType() == false)
                     return false;
-                if (q.getColRef().equals(p.getColRef()) == false)
+                if (q.getColRef() != null && q.getColRef().equals(p.getColRef()) == false)
                     return false;
+                if (!(q.getType().equals(p.getType()) && q.getValue().equals(q.getValue()))) {
+                    return false;
+                }
             } else {
                 if (q.isColumnType() == true)
                     return false;
@@ -185,8 +188,13 @@ public class ParameterDesc implements Serializable {
 
     @Override
     public String toString() {
-        String thisStr = isColumnType() ? colRef.toString() : value;
-        return nextParameter == null ? thisStr : thisStr + "," + nextParameter.toString();
+        String tmp = null;
+        if (isColumnType() && colRef != null) {
+            tmp = colRef.toString();
+        } else {
+            tmp = value;
+        }
+        return nextParameter == null ? tmp : tmp + "," + nextParameter.toString();
     }
 
     /**
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
index bc3886f..afb6a43 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
@@ -100,21 +100,24 @@ public class StreamingCoordinatorController extends BasicController {
         }
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse assignStreamingCube(@PathVariable String cubeName) {
         streamingCoordinartorService.assignCube(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse unAssignStreamingCube(@PathVariable String cubeName) {
         streamingCoordinartorService.unAssignCube(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST })
+    @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse reAssignStreamingCube(@PathVariable String cubeName,
             @RequestBody CubeAssignment newAssignments) {
@@ -122,49 +125,55 @@ public class StreamingCoordinatorController extends BasicController {
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST })
+    @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }, produces = { "application/json" })
     @ResponseBody
     public CoordinatorResponse createReplicaSet(@RequestBody ReplicaSet rs) {
         streamingCoordinartorService.createReplicaSet(rs);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse deleteReplicaSet(@PathVariable Integer replicaSetID) {
         streamingCoordinartorService.removeReplicaSet(replicaSetID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse addNodeToReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
         streamingCoordinartorService.addNodeToReplicaSet(replicaSetID, nodeID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
-    public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
+    public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID,
+            @PathVariable String nodeID) {
         streamingCoordinartorService.removeNodeFromReplicaSet(replicaSetID, nodeID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse pauseCubeConsume(@PathVariable String cubeName) {
         streamingCoordinartorService.pauseConsumers(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse resumeCubeConsume(@PathVariable String cubeName) {
         streamingCoordinartorService.resumeConsumers(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST })
+    @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST }, produces = { "application/json" })
     @ResponseBody
     public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody RemoteStoreCompleteRequest request) {
         Pair<Long, Long> segmentRange = new Pair<>(request.getSegmentStart(), request.getSegmentEnd());
@@ -176,7 +185,8 @@ public class StreamingCoordinatorController extends BasicController {
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST })
+    @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse replicaSetLeaderChange(@RequestBody ReplicaSetLeaderChangeRequest request) {
         logger.info("receive replicaSet leader change:" + request);
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index ec13485..36ae3b3 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -103,8 +103,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
         final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
         final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo);
         final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
-        final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime,
-                tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+        final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
+                tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
 
         logger.info("Query-{}:send request to stream receivers", query.getQueryId());
         for (final ReplicaSet rs : replicaSetsOfCube) {
@@ -173,9 +173,8 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
         return receivers.get((receiverNo + 1) % receiversSize);
     }
 
-    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube,
-            StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver,
-            TupleInfo tupleInfo) throws Exception {
+    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
+            RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
         logger.info("send query to receiver " + receiver + " with query id:" + queryId);
         String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
@@ -235,7 +234,7 @@ public class HttpStreamDataSearchClient implements IStreamDataSearchClient {
         }
         request.setGroups(groupSet);
 
-        request.setMetrics(metrics);
+        request.setMetrics(Lists.newArrayList(metrics));
 
         return request;
     }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index bb16dd5..f218d9d 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -100,9 +100,14 @@ import com.google.common.collect.Sets;
 import javax.annotation.Nullable;
 
 /**
- * 
- * Each Kylin Streaming cluster has a coordinator to handle generic assignment, membership and streaming cube state management.
+ * <pre>
+ * Each Kylin streaming cluster has at least one coordinator processes/server, coordinator
+ * server works as the master node of streaming cluster and handle generic assignment,
+ * membership and streaming cube state management.
  *
+ * When cluster have several coordinator processes, only the leader try to answer coordinator client's
+ * request, others process will become standby/candidate, so single point of failure will be eliminated.
+ * </pre>
  */
 public class Coordinator implements CoordinatorClient {
     private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index dd8b58a..07c9028 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.stream.core.model;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -30,7 +31,7 @@ public class DataRequest {
     private String havingFilter;
     private Set<String> dimensions; // what contains in Pair is <tableName, columnName>
     private Set<String> groups;
-    private Set<FunctionDesc> metrics;
+    private List<FunctionDesc> metrics;
     private int storagePushDownLimit = Integer.MAX_VALUE;
     private boolean allowStorageAggregation;
 
@@ -78,11 +79,11 @@ public class DataRequest {
         this.groups = groups;
     }
 
-    public Set<FunctionDesc> getMetrics() {
+    public List<FunctionDesc> getMetrics() {
         return metrics;
     }
 
-    public void setMetrics(Set<FunctionDesc> metrics) {
+    public void setMetrics(List<FunctionDesc> metrics) {
         this.metrics = metrics;
     }
 
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 0e05eaf..537f5a4 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -149,6 +149,8 @@ public class StreamingSegmentManager implements Closeable {
                 }
 
                 activeSegments.put(segmentStart, segment);
+                // when current active segments exceed tolerance, some unpredictable accident may happend,
+                // but is should be configurable or computed on the fly
                 if (activeSegments.size() > 12) {
                     logger.warn("Two many active segments, segments size = " + activeSegments.keySet());
                 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
index 1c75460..50c4ba6 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
@@ -109,6 +109,16 @@ public class RestService {
             HttpResponse response = httpClient.execute(request);
             String msg = EntityUtils.toString(response.getEntity());
             int code = response.getStatusLine().getStatusCode();
+            if (logger.isTraceEnabled()) {
+                String displayMessage;
+                if (msg.length() > 500) {
+                    displayMessage = msg.substring(0, 500);
+                } else {
+                    displayMessage = msg;
+                }
+                logger.trace("Send request: {}. And receive response[{}] which lenght is {}, and content is {}.", code,
+                        request.getRequestLine().toString(), msg.length(), displayMessage);
+            }
             if (code != 200)
                 throw new IOException("Invalid http response " + code + " when send request: "
                         + request.getURI().toString() + "\n" + msg);
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 481b746..c171561 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -85,7 +85,7 @@ import org.apache.kylin.stream.core.storage.columnar.ColumnarStoreCache;
 import org.apache.kylin.stream.core.util.HDFSUtil;
 import org.apache.kylin.stream.core.util.NamedThreadFactory;
 import org.apache.kylin.stream.core.util.NodeUtil;
-import org.apache.kylin.stream.server.retention.PolicyInfo;
+import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,6 +109,9 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
     private StreamMetadataStore streamMetadataStore;
     private Node currentNode;
     private int replicaSetID = -1;
+    /**
+     * indicate whether current receiver is the leader of whole replica set
+     */
     private volatile boolean isLeader = false;
 
     private ScheduledExecutorService segmentStateCheckerExecutor;
@@ -155,21 +158,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
                     CubeInstance cubeInstance = segmentManager.getCubeInstance();
                     String cubeName = cubeInstance.getName();
                     try {
-                        PolicyInfo policyInfo = new PolicyInfo();
+                        RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo();
                         String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
                         Map<String, String> policyProps = cubeInstance.getConfig()
                                 .getStreamingSegmentRetentionPolicyProperties(policyName);
-                        policyInfo.setName(policyName);
-                        policyInfo.setProperties(policyProps);
+                        retentionPolicyInfo.setName(policyName);
+                        retentionPolicyInfo.setProperties(policyProps);
                         //The returned segments that require remote persisted are already sorted in ascending order by the segment start time
                         Collection<StreamingCubeSegment> segments = segmentManager.getRequireRemotePersistSegments();
                         if (!segments.isEmpty()) {
                             logger.info("found cube {} segments:{} are immutable, retention policy is: {}", cubeName,
-                                    segments, policyInfo.getName());
+                                    segments, retentionPolicyInfo.getName());
                         } else {
                             continue;
                         }
-                        handleImmutableCubeSegments(cubeName, segmentManager, segments, policyInfo);
+                        handleImmutableCubeSegments(cubeName, segmentManager, segments, retentionPolicyInfo);
                     } catch (Exception e) {
                         logger.error("error when handle cube:" + cubeName, e);
                     }
@@ -178,14 +181,21 @@ public class StreamingServer implements ReplicaSetLeaderSelector.LeaderChangeLis
         }, 60, 60, TimeUnit.SECONDS);
     }
 
+    /**
+     * <pre>
+     * When segment status was changed to immutable, the leader of replica will
+     * try to upload local segment cache to remote, while the follower will remove
+     * local segment cache.
+     * </pre>
+     */
     private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager,
-            Collection<StreamingCubeSegment> segments, PolicyInfo policyInfo) throws Exception {
-        if (PolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(policyInfo.getName())) {
+            Collection<StreamingCubeSegment> segments, RetentionPolicyInfo retentionPolicyInfo) throws Exception {
+        if (RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName())) {
             if (isLeader) {
                 sendSegmentsToFullBuild(cubeName, segmentManager, segments);
             }
         } else {
-            purgeSegments(cubeName, segments, policyInfo.getProperties());
+            purgeSegments(cubeName, segments, retentionPolicyInfo.getProperties());
         }
     }
 
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 2fdc218..45c6307 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.stream.server.rest.controller;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.codec.binary.Base64;
@@ -140,7 +141,7 @@ public class DataController extends BasicController {
         }
     }
 
-    private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, Set<FunctionDesc> metrics) {
+    private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, List<FunctionDesc> metrics) {
         Set<FunctionDesc> result = Sets.newHashSet();
         for (FunctionDesc metric : metrics) {
             result.add(findAggrFuncFromCubeDesc(cubeDesc, metric));
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
similarity index 87%
rename from stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
rename to stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
index 08b3a6c..0320032 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
@@ -22,8 +22,17 @@ import java.util.Map;
 
 import com.google.common.collect.Maps;
 
-public class PolicyInfo {
+/**
+ * Retention policy for local segment cache
+ */
+public class RetentionPolicyInfo {
+    /**
+     * outdated data will be dropped
+     */
     public static final String PURGE_POLICY = "purge";
+    /**
+     * when data become immutable, it will be presisted remotely
+     */
     public static final String FULL_BUILD_POLICY = "fullBuild";
 
     private String name;