You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by sh...@apache.org on 2018/10/23 01:56:50 UTC

[kylin] branch master updated: KYLIN-3597 Refactor methods to reduce Cognitive Complexity

This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new c5394b4  KYLIN-3597 Refactor methods to reduce Cognitive Complexity
c5394b4 is described below

commit c5394b432f59e3552ff51b2e20c27ec38838184c
Author: ZhouYC627 <zh...@gmail.com>
AuthorDate: Thu Oct 18 22:42:06 2018 +0800

    KYLIN-3597 Refactor methods to reduce Cognitive Complexity
---
 .../java/org/apache/kylin/cube/CubeManager.java    | 163 ++++++++++++---------
 .../hbase/util/GridTableHBaseBenchmark.java        |  25 ++--
 2 files changed, 104 insertions(+), 84 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index dbd1955..7e3be40 100755
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -323,35 +323,48 @@ public class CubeManager implements IRealizationProvider {
 
         List<String> toRemoveResources = Lists.newArrayList();
         if (update.getToRemoveSegs() != null) {
-            Iterator<CubeSegment> iterator = newSegs.iterator();
-            while (iterator.hasNext()) {
-                CubeSegment currentSeg = iterator.next();
-                for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) {
-                    if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) {
-                        logger.info("Remove segment {}", currentSeg);
-                        toRemoveResources.add(currentSeg.getStatisticsResourcePath());
-                        iterator.remove();
-                        break;
-                    }
-                }
-            }
+            processToRemoveSegments(update, newSegs, toRemoveResources);
         }
 
         if (update.getToUpdateSegs() != null) {
-            for (CubeSegment segment : update.getToUpdateSegs()) {
-                for (int i = 0; i < newSegs.size(); i++) {
-                    if (newSegs.get(i).getUuid().equals(segment.getUuid())) {
-                        newSegs.set(i, segment);
-                        break;
-                    }
-                }
-            }
+            processToUpdateSegments(update, newSegs);
         }
 
         Collections.sort(newSegs);
         newSegs.validate();
         cube.setSegments(newSegs);
 
+        setCubeMember(cube, update);
+
+        try {
+            cube = crud.save(cube);
+        } catch (WriteConflictException ise) {
+            logger.warn("Write conflict to update cube {} at try {}, will retry...", cube.getName(), retry);
+            if (retry >= 7) {
+                logger.error("Retried 7 times till got error, abandoning...", ise);
+                throw ise;
+            }
+
+            cube = crud.reload(cube.getName());
+            update.setCubeInstance(cube.latestCopyForWrite());
+            return updateCubeWithRetry(update, ++retry);
+        }
+
+        for (String resource : toRemoveResources) {
+            try {
+                getStore().deleteResource(resource);
+            } catch (IOException ioe) {
+                logger.error("Failed to delete resource {}", toRemoveResources);
+            }
+        }
+
+        //this is a duplicate call to take care of scenarios where REST cache service unavailable
+        ProjectManager.getInstance(cube.getConfig()).clearL2Cache(cube.getProject());
+
+        return cube;
+    }
+
+    private void setCubeMember(CubeInstance cube, CubeUpdate update) {
         if (update.getStatus() != null) {
             cube.setStatus(update.getStatus());
         }
@@ -377,35 +390,32 @@ public class CubeManager implements IRealizationProvider {
                 cube.putSnapshotResPath(lookupSnapshotPathEntry.getKey(), lookupSnapshotPathEntry.getValue());
             }
         }
+    }
 
-        try {
-            cube = crud.save(cube);
-        } catch (WriteConflictException ise) {
-            logger.warn("Write conflict to update cube {} at try {}, will retry...", cube.getName(), retry);
-            if (retry >= 7) {
-                logger.error("Retried 7 times till got error, abandoning...", ise);
-                throw ise;
+    private void processToUpdateSegments(CubeUpdate update, Segments<CubeSegment> newSegs) {
+        for (CubeSegment segment : update.getToUpdateSegs()) {
+            for (int i = 0; i < newSegs.size(); i++) {
+                if (newSegs.get(i).getUuid().equals(segment.getUuid())) {
+                    newSegs.set(i, segment);
+                    break;
+                }
             }
-
-            cube = crud.reload(cube.getName());
-            update.setCubeInstance(cube.latestCopyForWrite());
-            return updateCubeWithRetry(update, ++retry);
         }
+    }
 
-        if (toRemoveResources.size() > 0) {
-            for (String resource : toRemoveResources) {
-                try {
-                    getStore().deleteResource(resource);
-                } catch (IOException ioe) {
-                    logger.error("Failed to delete resource {}", toRemoveResources);
+    private void processToRemoveSegments(CubeUpdate update, Segments<CubeSegment> newSegs, List<String> toRemoveResources) {
+        Iterator<CubeSegment> iterator = newSegs.iterator();
+        while (iterator.hasNext()) {
+            CubeSegment currentSeg = iterator.next();
+            for (CubeSegment toRemoveSeg : update.getToRemoveSegs()) {
+                if (currentSeg.getUuid().equals(toRemoveSeg.getUuid())) {
+                    logger.info("Remove segment {}", currentSeg);
+                    toRemoveResources.add(currentSeg.getStatisticsResourcePath());
+                    iterator.remove();
+                    break;
                 }
             }
         }
-
-        //this is a duplicate call to take care of scenarios where REST cache service unavailable
-        ProjectManager.getInstance(cube.getConfig()).clearL2Cache(cube.getProject());
-
-        return cube;
     }
 
     // for test
@@ -746,15 +756,7 @@ public class CubeManager implements IRealizationProvider {
 
             if (cubeCopy.getSegments().getFirstSegment().isOffsetCube()) {
                 // offset cube, merge by date range?
-                if (segRange == null && tsRange != null) {
-                    Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY)
-                            .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
-                    if (pair == null)
-                        throw new IllegalArgumentException(
-                                "Find no segments to merge by " + tsRange + " for cube " + cubeCopy);
-                    segRange = new SegmentRange(pair.getFirst().getSegRange().start,
-                            pair.getSecond().getSegRange().end);
-                }
+                segRange = getOffsetCubeSegRange(cubeCopy, tsRange, segRange);
                 tsRange = null;
                 Preconditions.checkArgument(segRange != null);
             } else {
@@ -779,12 +781,9 @@ public class CubeManager implements IRealizationProvider {
             CubeSegment first = mergingSegments.get(0);
             CubeSegment last = mergingSegments.get(mergingSegments.size() - 1);
             if (force == false) {
-                for (int i = 0; i < mergingSegments.size() - 1; i++) {
-                    if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
-                        throw new IllegalStateException("Merging segments must not have gaps between "
-                                + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
-                }
+                checkReadyForMerge(mergingSegments);
             }
+
             if (first.isOffsetCube()) {
                 newSegment.setSegRange(new SegmentRange(first.getSegRange().start, last.getSegRange().end));
                 newSegment.setSourcePartitionOffsetStart(first.getSourcePartitionOffsetStart());
@@ -795,21 +794,6 @@ public class CubeManager implements IRealizationProvider {
                 newSegment.setSegRange(null);
             }
 
-            if (force == false) {
-                List<String> emptySegment = Lists.newArrayList();
-                for (CubeSegment seg : mergingSegments) {
-                    if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
-                        emptySegment.add(seg.getName());
-                    }
-                }
-
-                if (emptySegment.size() > 0) {
-                    throw new IllegalArgumentException(
-                            "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
-                                    + emptySegment);
-                }
-            }
-
             validateNewSegments(cubeCopy, newSegment);
 
             CubeUpdate update = new CubeUpdate(cubeCopy);
@@ -819,6 +803,43 @@ public class CubeManager implements IRealizationProvider {
             return newSegment;
         }
 
+        private void checkReadyForMerge(Segments<CubeSegment> mergingSegments) {
+
+            // check if the segments to be merged are continuous
+            for (int i = 0; i < mergingSegments.size() - 1; i++) {
+                if (!mergingSegments.get(i).getSegRange().connects(mergingSegments.get(i + 1).getSegRange()))
+                    throw new IllegalStateException("Merging segments must not have gaps between "
+                            + mergingSegments.get(i) + " and " + mergingSegments.get(i + 1));
+            }
+
+            // check if the segments to be merged are not empty
+            List<String> emptySegment = Lists.newArrayList();
+            for (CubeSegment seg : mergingSegments) {
+                if (seg.getSizeKB() == 0 && seg.getInputRecords() == 0) {
+                    emptySegment.add(seg.getName());
+                }
+            }
+
+            if (emptySegment.size() > 0) {
+                throw new IllegalArgumentException(
+                        "Empty cube segment found, couldn't merge unless 'forceMergeEmptySegment' set to true: "
+                                + emptySegment);
+            }
+        }
+
+        private SegmentRange getOffsetCubeSegRange(CubeInstance cubeCopy, TSRange tsRange, SegmentRange segRange) {
+            if (segRange == null && tsRange != null) {
+                Pair<CubeSegment, CubeSegment> pair = cubeCopy.getSegments(SegmentStatusEnum.READY)
+                        .findMergeOffsetsByDateRange(tsRange, Long.MAX_VALUE);
+                if (pair == null)
+                    throw new IllegalArgumentException(
+                            "Find no segments to merge by " + tsRange + " for cube " + cubeCopy);
+                segRange = new SegmentRange(pair.getFirst().getSegRange().start,
+                        pair.getSecond().getSegRange().end);
+            }
+            return segRange;
+        }
+
         private void checkInputRanges(TSRange tsRange, SegmentRange segRange) {
             if (tsRange != null && segRange != null) {
                 throw new IllegalArgumentException(
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
index 6dd16fe..fb944dc 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/util/GridTableHBaseBenchmark.java
@@ -172,19 +172,18 @@ public class GridTableHBaseBenchmark {
 
             int i = 0;
             while (i < N_ROWS) {
-                int start;
-                int end;
-                for (start = i; start < N_ROWS; start++) {
-                    if (hits[start])
-                        break;
-                }
-                for (end = start + 1; end < N_ROWS; end++) {
-                    boolean isEnd = true;
-                    for (int j = 0; j < jumpThreshold && end + j < N_ROWS; j++)
-                        if (hits[end + j])
-                            isEnd = false;
-                    if (isEnd)
-                        break;
+                // find the first hit
+                int start = i;
+                while (start + 1 < N_ROWS && !hits[start]) start++;
+
+                // find the last hit within jumpThreshold
+                int end = start + 1;
+                int jump = end + 1;
+                while (jump < N_ROWS && (end + jumpThreshold > jump)) {
+                    if (hits[jump]) {
+                        end = jump;
+                    }
+                    jump++;
                 }
 
                 if (start < N_ROWS) {