You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/27 14:16:48 UTC

[kylin] 02/05: minor, sonar issues for unchecked stream read

This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 17e6c565529f2c04bd883bb12fea67e4069dfc05
Author: etherge <et...@163.com>
AuthorDate: Thu Jan 23 17:51:50 2020 -0500

    minor, sonar issues for unchecked stream read
---
 .../apache/kylin/gridtable/GTAggregateScanner.java | 15 ++---
 .../flink/util/PercentileCounterSerializer.java    | 18 ++++--
 .../spark/util/PercentileCounterSerializer.java    | 11 +++-
 .../core/storage/columnar/FragmentFilesMerger.java | 72 ++++++++++++++--------
 4 files changed, 78 insertions(+), 38 deletions(-)

diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 75e79d0..78e413a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -434,8 +434,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                 sumSpilledSize += dump.size();
                 // when spilled data is too much, we can modify it by other strategy.
                 // this means, all spilled data is bigger than half of original spillThreshold.
-                if(sumSpilledSize > spillThreshold) {
-                    for(Dump current : dumps) {
+                if (sumSpilledSize > spillThreshold) {
+                    for (Dump current : dumps) {
                         current.spill();
                     }
                     spillThreshold += sumSpilledSize;
@@ -678,7 +678,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                                 + (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
                     }
 
-                    if(spillBuffer == null) {
+                    if (spillBuffer == null) {
                         dis = new DataInputStream(new FileInputStream(dumpedFile));
                     } else {
                         dis = new DataInputStream(new ByteArrayInputStream(spillBuffer));
@@ -698,10 +698,10 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                                 cursorIdx++;
                                 int keyLen = dis.readInt();
                                 byte[] key = new byte[keyLen];
-                                dis.read(key);
+                                dis.readFully(key);
                                 int valueLen = dis.readInt();
                                 byte[] value = new byte[valueLen];
-                                dis.read(value);
+                                dis.readFully(value);
                                 return new Pair<>(key, value);
                             } catch (Exception e) {
                                 throw new RuntimeException(
@@ -720,7 +720,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
             }
 
             public void spill() throws IOException {
-                if(spillBuffer == null) return;
+                if (spillBuffer == null)
+                    return;
                 OutputStream ops = new FileOutputStream(dumpedFile);
                 InputStream ips = new ByteArrayInputStream(spillBuffer);
                 IOUtils.copy(ips, ops);
@@ -729,7 +730,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
                 IOUtils.closeQuietly(ops);
 
                 logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(),
-                    dumpedFile.length());
+                        dumpedFile.length());
             }
 
             public int size() {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
index 684f85e..a0d8f1c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
@@ -19,13 +19,14 @@
 
 package org.apache.kylin.engine.flink.util;
 
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.measure.percentile.PercentileCounter;
+
 import com.esotericsoftware.kryo.Kryo;
 import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
-import org.apache.kylin.measure.percentile.PercentileCounter;
-
-import java.nio.ByteBuffer;
 
 /**
  * A customized kryo serializer for {@link PercentileCounter}
@@ -49,7 +50,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> {
         double quantileRatio = input.readDouble();
         int length = input.readInt();
         byte[] buffer = new byte[length];
-        input.read(buffer);
+
+        int offset = 0;
+        int bytesRead;
+        while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) {
+            offset += bytesRead;
+            if (offset >= buffer.length) {
+                break;
+            }
+        }
+
         PercentileCounter counter = new PercentileCounter(compression, quantileRatio);
         counter.readRegisters(ByteBuffer.wrap(buffer));
         return counter;
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
index c9ba0f3..1e256f2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
@@ -47,7 +47,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> {
         double quantileRatio = input.readDouble();
         int length = input.readInt();
         byte[] buffer = new byte[length];
-        input.read(buffer);
+
+        int offset = 0;
+        int bytesRead;
+        while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) {
+            offset += bytesRead;
+            if (offset >= buffer.length) {
+                break;
+            }
+        }
+
         PercentileCounter counter = new PercentileCounter(compression, quantileRatio);
         counter.readRegisters(ByteBuffer.wrap(buffer));
         return counter;
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
index 4a9c398..627781b 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
@@ -93,8 +93,8 @@ public class FragmentFilesMerger {
         }
 
         Collections.sort(fragmentList);
-        FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(), fragmentList
-                .get(fragmentList.size() - 1).getFragmentId().getEndId());
+        FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(),
+                fragmentList.get(fragmentList.size() - 1).getFragmentId().getEndId());
         List<FragmentData> fragmentDataList = Lists.newArrayList();
         Map<TblColRef, List<Dictionary<String>>> dimDictListMap = Maps.newHashMap();
         Map<FragmentId, Map<TblColRef, Dictionary<String>>> fragmentDictionaryMaps = Maps.newHashMap();
@@ -146,8 +146,8 @@ public class FragmentFilesMerger {
         File mergedFragmentMetaFile = new File(mergeWorkingDirectory, mergedFragmentId + Constants.META_FILE_SUFFIX);
         try {
             FragmentMetaInfo mergedFragmentMeta = new FragmentMetaInfo();
-            CountingOutputStream fragmentDataOutput = new CountingOutputStream(new BufferedOutputStream(
-                    FileUtils.openOutputStream(mergedFragmentDataFile)));
+            CountingOutputStream fragmentDataOutput = new CountingOutputStream(
+                    new BufferedOutputStream(FileUtils.openOutputStream(mergedFragmentDataFile)));
             // merge dictionaries
             Map<TblColRef, Dictionary<String>> mergedDictMap = mergeAndPersistDictionaries(mergedFragmentMeta,
                     dimDictListMap, fragmentDataOutput);
@@ -202,8 +202,8 @@ public class FragmentFilesMerger {
         List<DimDictionaryMetaInfo> dimDictionaryMetaInfos = Lists.newArrayList();
         for (TblColRef dimension : parsedCubeInfo.dimensionsUseDictEncoding) {
             List<Dictionary<String>> dicts = dimDictListMap.get(dimension);
-            MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator(dimension.getType(),
-                    dicts);
+            MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator(
+                    dimension.getType(), dicts);
             Dictionary<String> mergedDict = DictionaryGenerator.buildDictionary(dimension.getType(),
                     multipleDictionaryValueEnumerator);
             mergedDictMap.put(dimension, mergedDict);
@@ -240,10 +240,10 @@ public class FragmentFilesMerger {
             } else {
                 cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(cuboidId);
             }
-            Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps.get(FragmentId.parse(fragmentMetaInfo
-                    .getFragmentId()));
-            DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings(
-                    parsedCubeInfo.cubeDesc, dimensions, dictMap);
+            Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps
+                    .get(FragmentId.parse(fragmentMetaInfo.getFragmentId()));
+            DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo
+                    .getDimensionEncodings(parsedCubeInfo.cubeDesc, dimensions, dictMap);
             FragmentCuboidReader fragmentCuboidReader = new FragmentCuboidReader(parsedCubeInfo.cubeDesc, fragmentData,
                     cuboidMetaInfo, cuboidInfo.getDimensions(), parsedCubeInfo.measureDescs, dimensionEncodings);
             fragmentCuboidReaders.add(fragmentCuboidReader);
@@ -264,7 +264,8 @@ public class FragmentFilesMerger {
                 if (dict instanceof TrieDictionary) {
                     invertIndexColDescs[i] = new SeqIIColumnDescriptor(dim.getName(), dict.getMinId(), dict.getMaxId());
                 } else {
-                    invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding());
+                    invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(),
+                            encoding.getLengthOfEncoding());
                 }
             } else {
                 invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding());
@@ -282,8 +283,8 @@ public class FragmentFilesMerger {
         for (int i = 0; i < metricDataWriters.length; i++) {
             metricDataWriters[i] = new CuboidMetricDataWriter(cuboidId, parsedCubeInfo.measureDescs[i].getName(),
                     parsedCubeInfo.getMeasureTypeSerializer(i).maxLength());
-            metricsEncodings[i] = ColumnarMetricsEncodingFactory.create(parsedCubeInfo.measureDescs[i].getFunction()
-                    .getReturnDataType());
+            metricsEncodings[i] = ColumnarMetricsEncodingFactory
+                    .create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType());
         }
 
         FragmentCuboidDataMerger fragmentCuboidDataMerger = new FragmentCuboidDataMerger(cuboidInfo,
@@ -324,19 +325,29 @@ public class FragmentFilesMerger {
         for (int i = 0; i < dimDataWriters.length; i++) {
             DimensionEncoding encoding = mergedDimEncodings[i];
             int dimFixLen = encoding.getLengthOfEncoding();
-            InputStream dimInput = new BufferedInputStream(FileUtils.openInputStream(dimDataWriters[i].getOutputFile()));
+            InputStream dimInput = new BufferedInputStream(
+                    FileUtils.openInputStream(dimDataWriters[i].getOutputFile()));
             try {
                 DimensionMetaInfo dimensionMeta = new DimensionMetaInfo();
                 dimensionMeta.setName(dimensions[i].getName());
                 int startOffset = (int) fragmentDataOutput.getCount();
                 dimensionMeta.setStartOffset(startOffset);
 
-                ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc.getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc,
-                        dimensions[i].getName(), encoding);
+                ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc
+                        .getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc, dimensions[i].getName(), encoding);
                 ColumnDataWriter columnDataWriter = cStoreDimDesc.getDimWriter(fragmentDataOutput, rowCnt);
                 for (int j = 0; j < rowCnt; j++) {
                     byte[] dimValue = new byte[dimFixLen];
-                    dimInput.read(dimValue);
+
+                    int offset = 0;
+                    int bytesRead;
+                    while ((bytesRead = dimInput.read(dimValue, offset, dimValue.length - offset)) != -1) {
+                        offset += bytesRead;
+                        if (offset >= dimValue.length) {
+                            break;
+                        }
+                    }
+
                     if (DimensionEncoding.isNull(dimValue, 0, dimValue.length)) {
                         dimensionMeta.setHasNull(true);
                     }
@@ -358,8 +369,8 @@ public class FragmentFilesMerger {
         }
 
         for (int i = 0; i < metricDataWriters.length; i++) {
-            DataInputStream metricInput = new DataInputStream(new BufferedInputStream(
-                    FileUtils.openInputStream(metricDataWriters[i].getOutputFile())));
+            DataInputStream metricInput = new DataInputStream(
+                    new BufferedInputStream(FileUtils.openInputStream(metricDataWriters[i].getOutputFile())));
             try {
                 ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory
                         .create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType());
@@ -373,7 +384,16 @@ public class FragmentFilesMerger {
                 for (int j = 0; j < rowCnt; j++) {
                     int metricLen = metricInput.readInt();
                     byte[] metricValue = new byte[metricLen];
-                    metricInput.read(metricValue);
+
+                    int offset = 0;
+                    int bytesRead;
+                    while ((bytesRead = metricInput.read(metricValue, offset, metricValue.length - offset)) != -1) {
+                        offset += bytesRead;
+                        if (offset >= metricValue.length) {
+                            break;
+                        }
+                    }
+
                     columnDataWriter.write(metricValue);
                 }
                 columnDataWriter.flush();
@@ -404,7 +424,8 @@ public class FragmentFilesMerger {
             this.colName = colName;
 
             this.tmpColDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + colName + ".data");
-            this.output = new CountingOutputStream(new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile)));
+            this.output = new CountingOutputStream(
+                    new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile)));
         }
 
         public void write(byte[] value) throws IOException {
@@ -437,8 +458,8 @@ public class FragmentFilesMerger {
             this.metricName = metricName;
             this.maxValLen = maxValLen;
             this.tmpMetricDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + metricName + ".data");
-            this.countingOutput = new CountingOutputStream(new BufferedOutputStream(
-                    FileUtils.openOutputStream(tmpMetricDataFile)));
+            this.countingOutput = new CountingOutputStream(
+                    new BufferedOutputStream(FileUtils.openOutputStream(tmpMetricDataFile)));
             this.output = new DataOutputStream(countingOutput);
         }
 
@@ -534,9 +555,8 @@ public class FragmentFilesMerger {
             enqueueFromFragment(currRecordEntry.getSecond());
             boolean needAggregate = false;
             boolean first = true;
-            while ((!minHeap.isEmpty())
-                    && StringArrayComparator.INSTANCE.compare(currRecord.dimensions,
-                            minHeap.peek().getFirst().dimensions) == 0) {
+            while ((!minHeap.isEmpty()) && StringArrayComparator.INSTANCE.compare(currRecord.dimensions,
+                    minHeap.peek().getFirst().dimensions) == 0) {
                 if (first) {
                     doAggregate(currRecord);
                     first = false;