You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@kylin.apache.org by GitBox <gi...@apache.org> on 2018/12/27 03:06:32 UTC

[GitHub] allenma closed pull request #417: KYLIN-3742 Fix DataRequest for NPE and add some javadoc

allenma closed pull request #417: KYLIN-3742 Fix DataRequest for NPE and add some javadoc
URL: https://github.com/apache/kylin/pull/417
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index f757503ebc..45af397278 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -149,8 +149,11 @@ public boolean equals(Object o) {
             if (p.isColumnType()) {
                 if (q.isColumnType() == false)
                     return false;
-                if (q.getColRef().equals(p.getColRef()) == false)
+                if (q.getColRef() != null && q.getColRef().equals(p.getColRef()) == false)
                     return false;
+                if (!(q.getType().equals(p.getType()) && q.getValue().equals(q.getValue()))) {
+                    return false;
+                }
             } else {
                 if (q.isColumnType() == true)
                     return false;
@@ -185,8 +188,13 @@ public int hashCode() {
 
     @Override
     public String toString() {
-        String thisStr = isColumnType() ? colRef.toString() : value;
-        return nextParameter == null ? thisStr : thisStr + "," + nextParameter.toString();
+        String tmp = null;
+        if (isColumnType() && colRef != null) {
+            tmp = colRef.toString();
+        } else {
+            tmp = value;
+        }
+        return nextParameter == null ? tmp : tmp + "," + nextParameter.toString();
     }
 
     /**
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
index bc3886fa97..afb6a43afb 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/StreamingCoordinatorController.java
@@ -100,21 +100,24 @@ public CoordinatorResponse reBalance(@RequestBody String reBalancePlanStr) {
         }
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/assign", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse assignStreamingCube(@PathVariable String cubeName) {
         streamingCoordinartorService.assignCube(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/unAssign", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse unAssignStreamingCube(@PathVariable String cubeName) {
         streamingCoordinartorService.unAssignCube(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST })
+    @RequestMapping(value = "/cubes/{cubeName}/reAssign", method = { RequestMethod.POST }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse reAssignStreamingCube(@PathVariable String cubeName,
             @RequestBody CubeAssignment newAssignments) {
@@ -122,49 +125,55 @@ public CoordinatorResponse reAssignStreamingCube(@PathVariable String cubeName,
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST })
+    @RequestMapping(value = "/replicaSet", method = { RequestMethod.POST }, produces = { "application/json" })
     @ResponseBody
     public CoordinatorResponse createReplicaSet(@RequestBody ReplicaSet rs) {
         streamingCoordinartorService.createReplicaSet(rs);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse deleteReplicaSet(@PathVariable Integer replicaSetID) {
         streamingCoordinartorService.removeReplicaSet(replicaSetID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse addNodeToReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
         streamingCoordinartorService.addNodeToReplicaSet(replicaSetID, nodeID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE })
+    @RequestMapping(value = "/replicaSet/{replicaSetID}/{nodeID:.+}", method = { RequestMethod.DELETE }, produces = {
+            "application/json" })
     @ResponseBody
-    public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID, @PathVariable String nodeID) {
+    public CoordinatorResponse removeNodeFromReplicaSet(@PathVariable Integer replicaSetID,
+            @PathVariable String nodeID) {
         streamingCoordinartorService.removeNodeFromReplicaSet(replicaSetID, nodeID);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/pauseConsume", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse pauseCubeConsume(@PathVariable String cubeName) {
         streamingCoordinartorService.pauseConsumers(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT })
+    @RequestMapping(value = "/cubes/{cubeName}/resumeConsume", method = { RequestMethod.PUT }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse resumeCubeConsume(@PathVariable String cubeName) {
         streamingCoordinartorService.resumeConsumers(cubeName);
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST })
+    @RequestMapping(value = "/remoteStoreComplete", method = { RequestMethod.POST }, produces = { "application/json" })
     @ResponseBody
     public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody RemoteStoreCompleteRequest request) {
         Pair<Long, Long> segmentRange = new Pair<>(request.getSegmentStart(), request.getSegmentEnd());
@@ -176,7 +185,8 @@ public CoordinatorResponse segmentRemoteStoreComplete(@RequestBody RemoteStoreCo
         return new CoordinatorResponse();
     }
 
-    @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST })
+    @RequestMapping(value = "/replicaSetLeaderChange", method = { RequestMethod.POST }, produces = {
+            "application/json" })
     @ResponseBody
     public CoordinatorResponse replicaSetLeaderChange(@RequestBody ReplicaSetLeaderChangeRequest request) {
         logger.info("receive replicaSet leader change:" + request);
diff --git a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
index ec13485ec0..36ae3b315d 100644
--- a/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
+++ b/storage-stream/src/main/java/org/apache/kylin/storage/stream/rpc/HttpStreamDataSearchClient.java
@@ -103,8 +103,8 @@ public ITupleIterator search(final long minSegmentTime, final CubeInstance cube,
         final ResponseResultSchema schema = new ResponseResultSchema(cubeDesc, dimensions, metrics);
         final StreamingTupleConverter tupleConverter = new StreamingTupleConverter(schema, tupleInfo);
         final RecordsSerializer recordsSerializer = new RecordsSerializer(schema);
-        final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime,
-                tupleInfo, tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
+        final DataRequest dataRequest = createDataRequest(query.getQueryId(), cube.getName(), minSegmentTime, tupleInfo,
+                tupleFilter, dimensions, groups, metrics, storagePushDownLimit, allowStorageAggregation);
 
         logger.info("Query-{}:send request to stream receivers", query.getQueryId());
         for (final ReplicaSet rs : replicaSetsOfCube) {
@@ -173,9 +173,8 @@ private Node findBestReceiverServeQuery(List<Node> receivers, Node lead, String
         return receivers.get((receiverNo + 1) % receiversSize);
     }
 
-    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube,
-            StreamingTupleConverter tupleConverter, RecordsSerializer recordsSerializer, Node receiver,
-            TupleInfo tupleInfo) throws Exception {
+    public Iterator<ITuple> doSearch(DataRequest dataRequest, CubeInstance cube, StreamingTupleConverter tupleConverter,
+            RecordsSerializer recordsSerializer, Node receiver, TupleInfo tupleInfo) throws Exception {
         String queryId = dataRequest.getQueryId();
         logger.info("send query to receiver " + receiver + " with query id:" + queryId);
         String url = "http://" + receiver.getHost() + ":" + receiver.getPort() + "/kylin/api/data/query";
@@ -235,7 +234,7 @@ private DataRequest createDataRequest(String queryId, String cubeName, long minS
         }
         request.setGroups(groupSet);
 
-        request.setMetrics(metrics);
+        request.setMetrics(Lists.newArrayList(metrics));
 
         return request;
     }
diff --git a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
index bb16dd5d3e..f218d9d573 100644
--- a/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
+++ b/stream-coordinator/src/main/java/org/apache/kylin/stream/coordinator/Coordinator.java
@@ -100,9 +100,14 @@
 import javax.annotation.Nullable;
 
 /**
- * 
- * Each Kylin Streaming cluster has a coordinator to handle generic assignment, membership and streaming cube state management.
+ * <pre>
+ * Each Kylin streaming cluster has at least one coordinator processes/server, coordinator
+ * server works as the master node of streaming cluster and handle generic assignment,
+ * membership and streaming cube state management.
  *
+ * When cluster have several coordinator processes, only the leader try to answer coordinator client's
+ * request, others process will become standby/candidate, so single point of failure will be eliminated.
+ * </pre>
  */
 public class Coordinator implements CoordinatorClient {
     private static final Logger logger = LoggerFactory.getLogger(Coordinator.class);
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
index dd8b58a4ee..07c9028729 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/model/DataRequest.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.stream.core.model;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -30,7 +31,7 @@
     private String havingFilter;
     private Set<String> dimensions; // what contains in Pair is <tableName, columnName>
     private Set<String> groups;
-    private Set<FunctionDesc> metrics;
+    private List<FunctionDesc> metrics;
     private int storagePushDownLimit = Integer.MAX_VALUE;
     private boolean allowStorageAggregation;
 
@@ -78,11 +79,11 @@ public void setGroups(Set<String> groups) {
         this.groups = groups;
     }
 
-    public Set<FunctionDesc> getMetrics() {
+    public List<FunctionDesc> getMetrics() {
         return metrics;
     }
 
-    public void setMetrics(Set<FunctionDesc> metrics) {
+    public void setMetrics(List<FunctionDesc> metrics) {
         this.metrics = metrics;
     }
 
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 0e05eaf48c..537f5a4739 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -149,6 +149,8 @@ public void addEvent(StreamingMessage event) {
                 }
 
                 activeSegments.put(segmentStart, segment);
+                // when current active segments exceed tolerance, some unpredictable accident may happend,
+                // but is should be configurable or computed on the fly
                 if (activeSegments.size() > 12) {
                     logger.warn("Two many active segments, segments size = " + activeSegments.keySet());
                 }
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
index 1c75460ed4..50c4ba691e 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/util/RestService.java
@@ -109,6 +109,16 @@ public String execRequest(HttpRequestBase request, int connectionTimeout, int re
             HttpResponse response = httpClient.execute(request);
             String msg = EntityUtils.toString(response.getEntity());
             int code = response.getStatusLine().getStatusCode();
+            if (logger.isTraceEnabled()) {
+                String displayMessage;
+                if (msg.length() > 500) {
+                    displayMessage = msg.substring(0, 500);
+                } else {
+                    displayMessage = msg;
+                }
+                logger.trace("Send request: {}. And receive response[{}] which lenght is {}, and content is {}.", code,
+                        request.getRequestLine().toString(), msg.length(), displayMessage);
+            }
             if (code != 200)
                 throw new IOException("Invalid http response " + code + " when send request: "
                         + request.getURI().toString() + "\n" + msg);
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
index 481b74670f..c171561fca 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/StreamingServer.java
@@ -85,7 +85,7 @@
 import org.apache.kylin.stream.core.util.HDFSUtil;
 import org.apache.kylin.stream.core.util.NamedThreadFactory;
 import org.apache.kylin.stream.core.util.NodeUtil;
-import org.apache.kylin.stream.server.retention.PolicyInfo;
+import org.apache.kylin.stream.server.retention.RetentionPolicyInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -109,6 +109,9 @@
     private StreamMetadataStore streamMetadataStore;
     private Node currentNode;
     private int replicaSetID = -1;
+    /**
+     * indicate whether current receiver is the leader of whole replica set
+     */
     private volatile boolean isLeader = false;
 
     private ScheduledExecutorService segmentStateCheckerExecutor;
@@ -155,21 +158,21 @@ public void run() {
                     CubeInstance cubeInstance = segmentManager.getCubeInstance();
                     String cubeName = cubeInstance.getName();
                     try {
-                        PolicyInfo policyInfo = new PolicyInfo();
+                        RetentionPolicyInfo retentionPolicyInfo = new RetentionPolicyInfo();
                         String policyName = cubeInstance.getConfig().getStreamingSegmentRetentionPolicy();
                         Map<String, String> policyProps = cubeInstance.getConfig()
                                 .getStreamingSegmentRetentionPolicyProperties(policyName);
-                        policyInfo.setName(policyName);
-                        policyInfo.setProperties(policyProps);
+                        retentionPolicyInfo.setName(policyName);
+                        retentionPolicyInfo.setProperties(policyProps);
                         //The returned segments that require remote persisted are already sorted in ascending order by the segment start time
                         Collection<StreamingCubeSegment> segments = segmentManager.getRequireRemotePersistSegments();
                         if (!segments.isEmpty()) {
                             logger.info("found cube {} segments:{} are immutable, retention policy is: {}", cubeName,
-                                    segments, policyInfo.getName());
+                                    segments, retentionPolicyInfo.getName());
                         } else {
                             continue;
                         }
-                        handleImmutableCubeSegments(cubeName, segmentManager, segments, policyInfo);
+                        handleImmutableCubeSegments(cubeName, segmentManager, segments, retentionPolicyInfo);
                     } catch (Exception e) {
                         logger.error("error when handle cube:" + cubeName, e);
                     }
@@ -178,14 +181,21 @@ public void run() {
         }, 60, 60, TimeUnit.SECONDS);
     }
 
+    /**
+     * <pre>
+     * When segment status was changed to immutable, the leader of replica will
+     * try to upload local segment cache to remote, while the follower will remove
+     * local segment cache.
+     * </pre>
+     */
     private void handleImmutableCubeSegments(String cubeName, StreamingSegmentManager segmentManager,
-            Collection<StreamingCubeSegment> segments, PolicyInfo policyInfo) throws Exception {
-        if (PolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(policyInfo.getName())) {
+            Collection<StreamingCubeSegment> segments, RetentionPolicyInfo retentionPolicyInfo) throws Exception {
+        if (RetentionPolicyInfo.FULL_BUILD_POLICY.equalsIgnoreCase(retentionPolicyInfo.getName())) {
             if (isLeader) {
                 sendSegmentsToFullBuild(cubeName, segmentManager, segments);
             }
         } else {
-            purgeSegments(cubeName, segments, policyInfo.getProperties());
+            purgeSegments(cubeName, segments, retentionPolicyInfo.getProperties());
         }
     }
 
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
index 2fdc218ae2..45c6307518 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/rest/controller/DataController.java
@@ -18,6 +18,7 @@
 
 package org.apache.kylin.stream.server.rest.controller;
 
+import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.codec.binary.Base64;
@@ -140,7 +141,7 @@ public DataResponse query(@RequestBody DataRequest dataRequest) {
         }
     }
 
-    private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, Set<FunctionDesc> metrics) {
+    private Set<FunctionDesc> convertMetrics(CubeDesc cubeDesc, List<FunctionDesc> metrics) {
         Set<FunctionDesc> result = Sets.newHashSet();
         for (FunctionDesc metric : metrics) {
             result.add(findAggrFuncFromCubeDesc(cubeDesc, metric));
diff --git a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
similarity index 87%
rename from stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
rename to stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
index 08b3a6c3fc..0320032108 100644
--- a/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/PolicyInfo.java
+++ b/stream-receiver/src/main/java/org/apache/kylin/stream/server/retention/RetentionPolicyInfo.java
@@ -22,8 +22,17 @@
 
 import com.google.common.collect.Maps;
 
-public class PolicyInfo {
+/**
+ * Retention policy for local segment cache
+ */
+public class RetentionPolicyInfo {
+    /**
+     * outdated data will be dropped
+     */
     public static final String PURGE_POLICY = "purge";
+    /**
+     * when data become immutable, it will be presisted remotely
+     */
     public static final String FULL_BUILD_POLICY = "fullBuild";
 
     private String name;


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services