You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2019/01/28 06:05:32 UTC

[kylin] branch realtime-streaming updated: KYLIN-3791 Map return by Maps.transformValues is a immutable view

This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/realtime-streaming by this push:
     new 67101ce  KYLIN-3791 Map return by Maps.transformValues is a immutable view
67101ce is described below

commit 67101ce6c3f3ab8dcad855cd98e6c97deda063f0
Author: hit-lacus <hi...@126.com>
AuthorDate: Sat Jan 26 21:47:37 2019 +0800

    KYLIN-3791 Map return by Maps.transformValues is a immutable view
---
 .../stream/core/storage/StreamingSegmentManager.java   | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 28a294d..38c53fe 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -62,12 +62,18 @@ public class StreamingSegmentManager implements Closeable {
     private final String cubeName;
     private final CubeInstance cubeInstance;
 
-    //Cube window defines how streaming events are divided and put into different segments , for example 1 hour per segment(indexer).
-    //If the received events' timestamp is completely out of order and belongs to a very wide range, there will be multiple active segment indexers created and serve the indexing and querying.
+    /**
+     * Cube window defines how streaming events are divided and put into different segments , for example 1 hour per segment(indexer).
+     * If the received events' timestamp is completely out of order and belongs to a very wide range,
+     * there will be multiple active segment indexers created and serve the indexing and querying.
+     * */
     private final long cubeWindow;
 
-    //Cube duration defines how long the oldest streaming segment becomes immutable and does not allow additional modification.
-    //Any further long latency events that can't find a corresponding segment to serve the index, the events will be put to a specific segment for long latency events only.
+    /**
+     * Cube duration defines how long the oldest streaming segment becomes immutable and does not allow additional modification.
+     * Any further long latency events that can't find a corresponding segment to serve the index,
+     * the events will be put to a specific segment for long latency events only.
+     * */
     private final long cubeDuration;
 
     private final long maxCubeDuration;
@@ -223,13 +229,13 @@ public class StreamingSegmentManager implements Closeable {
     private void restoreSegmentsFromCP(List<File> segmentFolders, Map<Long, String> checkpointStoreStats,
                                        Map<Long, String> segmentSourceStartPositions, CubeSegment latestRemoteSegment) {
         if (segmentSourceStartPositions != null) {
-            this.segmentSourceStartPositions = Maps.transformValues(segmentSourceStartPositions, new Function<String, ISourcePosition>() {
+            this.segmentSourceStartPositions.putAll(Maps.transformValues(segmentSourceStartPositions, new Function<String, ISourcePosition>() {
                 @Nullable
                 @Override
                 public ISourcePosition apply(@Nullable String input) {
                     return sourcePositionHandler.parsePosition(input);
                 }
-            });
+            }));
         }
         for (File segmentFolder : segmentFolders) {
             try {