You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ma...@apache.org on 2018/12/28 03:15:16 UTC

[kylin] branch realtime-streaming updated: KYLIN-3745 Real-time segment state changed from active to immutable is not sequently

This is an automated email from the ASF dual-hosted git repository.

magang pushed a commit to branch realtime-streaming
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/realtime-streaming by this push:
     new a3e76c0  KYLIN-3745 Real-time segment state changed from active to immutable is not sequently
a3e76c0 is described below

commit a3e76c07357b7132373c16dd45a71aa7a470c9cf
Author: Ma,Gang <ga...@ebay.com>
AuthorDate: Fri Dec 28 11:14:51 2018 +0800

    KYLIN-3745 Real-time segment state changed from active to immutable is not sequently
---
 .../apache/kylin/stream/core/storage/StreamingSegmentManager.java    | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
index 537f5a4..28a294d 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/StreamingSegmentManager.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
@@ -80,8 +81,8 @@ public class StreamingSegmentManager implements Closeable {
 
     private final IConsumerProvider consumerProvider;
 
-    private final Map<Long, StreamingCubeSegment> activeSegments = Maps.newConcurrentMap();
-    private final Map<Long, StreamingCubeSegment> immutableSegments = Maps.newConcurrentMap();
+    private final Map<Long, StreamingCubeSegment> activeSegments = new ConcurrentSkipListMap<>();
+    private final Map<Long, StreamingCubeSegment> immutableSegments = new ConcurrentSkipListMap<>();
     private Map<Long, ISourcePosition> segmentSourceStartPositions = Maps.newConcurrentMap();
 
     private final ISourcePositionHandler sourcePositionHandler;