You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by ni...@apache.org on 2020/01/27 14:16:48 UTC
[kylin] 02/05: minor, sonar issues for unchecked stream read
This is an automated email from the ASF dual-hosted git repository.
nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 17e6c565529f2c04bd883bb12fea67e4069dfc05
Author: etherge <et...@163.com>
AuthorDate: Thu Jan 23 17:51:50 2020 -0500
minor, sonar issues for unchecked stream read
---
.../apache/kylin/gridtable/GTAggregateScanner.java | 15 ++---
.../flink/util/PercentileCounterSerializer.java | 18 ++++--
.../spark/util/PercentileCounterSerializer.java | 11 +++-
.../core/storage/columnar/FragmentFilesMerger.java | 72 ++++++++++++++--------
4 files changed, 78 insertions(+), 38 deletions(-)
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 75e79d0..78e413a 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -434,8 +434,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
sumSpilledSize += dump.size();
// when spilled data is too much, we can modify it by other strategy.
// this means, all spilled data is bigger than half of original spillThreshold.
- if(sumSpilledSize > spillThreshold) {
- for(Dump current : dumps) {
+ if (sumSpilledSize > spillThreshold) {
+ for (Dump current : dumps) {
current.spill();
}
spillThreshold += sumSpilledSize;
@@ -678,7 +678,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
+ (dumpedFile == null ? "<null>" : dumpedFile.getAbsolutePath()));
}
- if(spillBuffer == null) {
+ if (spillBuffer == null) {
dis = new DataInputStream(new FileInputStream(dumpedFile));
} else {
dis = new DataInputStream(new ByteArrayInputStream(spillBuffer));
@@ -698,10 +698,10 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
cursorIdx++;
int keyLen = dis.readInt();
byte[] key = new byte[keyLen];
- dis.read(key);
+ dis.readFully(key);
int valueLen = dis.readInt();
byte[] value = new byte[valueLen];
- dis.read(value);
+ dis.readFully(value);
return new Pair<>(key, value);
} catch (Exception e) {
throw new RuntimeException(
@@ -720,7 +720,8 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
}
public void spill() throws IOException {
- if(spillBuffer == null) return;
+ if (spillBuffer == null)
+ return;
OutputStream ops = new FileOutputStream(dumpedFile);
InputStream ips = new ByteArrayInputStream(spillBuffer);
IOUtils.copy(ips, ops);
@@ -729,7 +730,7 @@ public class GTAggregateScanner implements IGTScanner, IGTBypassChecker {
IOUtils.closeQuietly(ops);
logger.info("Spill buffer to disk, location: {}, size = {}.", dumpedFile.getAbsolutePath(),
- dumpedFile.length());
+ dumpedFile.length());
}
public int size() {
diff --git a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
index 684f85e..a0d8f1c 100644
--- a/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
+++ b/engine-flink/src/main/java/org/apache/kylin/engine/flink/util/PercentileCounterSerializer.java
@@ -19,13 +19,14 @@
package org.apache.kylin.engine.flink.util;
+import java.nio.ByteBuffer;
+
+import org.apache.kylin.measure.percentile.PercentileCounter;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
-import org.apache.kylin.measure.percentile.PercentileCounter;
-
-import java.nio.ByteBuffer;
/**
* A customized kryo serializer for {@link PercentileCounter}
@@ -49,7 +50,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> {
double quantileRatio = input.readDouble();
int length = input.readInt();
byte[] buffer = new byte[length];
- input.read(buffer);
+
+ int offset = 0;
+ int bytesRead;
+ while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) {
+ offset += bytesRead;
+ if (offset >= buffer.length) {
+ break;
+ }
+ }
+
PercentileCounter counter = new PercentileCounter(compression, quantileRatio);
counter.readRegisters(ByteBuffer.wrap(buffer));
return counter;
diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
index c9ba0f3..1e256f2 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/util/PercentileCounterSerializer.java
@@ -47,7 +47,16 @@ public class PercentileCounterSerializer extends Serializer<PercentileCounter> {
double quantileRatio = input.readDouble();
int length = input.readInt();
byte[] buffer = new byte[length];
- input.read(buffer);
+
+ int offset = 0;
+ int bytesRead;
+ while ((bytesRead = input.read(buffer, offset, buffer.length - offset)) != -1) {
+ offset += bytesRead;
+ if (offset >= buffer.length) {
+ break;
+ }
+ }
+
PercentileCounter counter = new PercentileCounter(compression, quantileRatio);
counter.readRegisters(ByteBuffer.wrap(buffer));
return counter;
diff --git a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
index 4a9c398..627781b 100644
--- a/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
+++ b/stream-core/src/main/java/org/apache/kylin/stream/core/storage/columnar/FragmentFilesMerger.java
@@ -93,8 +93,8 @@ public class FragmentFilesMerger {
}
Collections.sort(fragmentList);
- FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(), fragmentList
- .get(fragmentList.size() - 1).getFragmentId().getEndId());
+ FragmentId mergedFragmentId = new FragmentId(fragmentList.get(0).getFragmentId().getStartId(),
+ fragmentList.get(fragmentList.size() - 1).getFragmentId().getEndId());
List<FragmentData> fragmentDataList = Lists.newArrayList();
Map<TblColRef, List<Dictionary<String>>> dimDictListMap = Maps.newHashMap();
Map<FragmentId, Map<TblColRef, Dictionary<String>>> fragmentDictionaryMaps = Maps.newHashMap();
@@ -146,8 +146,8 @@ public class FragmentFilesMerger {
File mergedFragmentMetaFile = new File(mergeWorkingDirectory, mergedFragmentId + Constants.META_FILE_SUFFIX);
try {
FragmentMetaInfo mergedFragmentMeta = new FragmentMetaInfo();
- CountingOutputStream fragmentDataOutput = new CountingOutputStream(new BufferedOutputStream(
- FileUtils.openOutputStream(mergedFragmentDataFile)));
+ CountingOutputStream fragmentDataOutput = new CountingOutputStream(
+ new BufferedOutputStream(FileUtils.openOutputStream(mergedFragmentDataFile)));
// merge dictionaries
Map<TblColRef, Dictionary<String>> mergedDictMap = mergeAndPersistDictionaries(mergedFragmentMeta,
dimDictListMap, fragmentDataOutput);
@@ -202,8 +202,8 @@ public class FragmentFilesMerger {
List<DimDictionaryMetaInfo> dimDictionaryMetaInfos = Lists.newArrayList();
for (TblColRef dimension : parsedCubeInfo.dimensionsUseDictEncoding) {
List<Dictionary<String>> dicts = dimDictListMap.get(dimension);
- MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator(dimension.getType(),
- dicts);
+ MultipleDictionaryValueEnumerator multipleDictionaryValueEnumerator = new MultipleDictionaryValueEnumerator(
+ dimension.getType(), dicts);
Dictionary<String> mergedDict = DictionaryGenerator.buildDictionary(dimension.getType(),
multipleDictionaryValueEnumerator);
mergedDictMap.put(dimension, mergedDict);
@@ -240,10 +240,10 @@ public class FragmentFilesMerger {
} else {
cuboidMetaInfo = fragmentMetaInfo.getCuboidMetaInfo(cuboidId);
}
- Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps.get(FragmentId.parse(fragmentMetaInfo
- .getFragmentId()));
- DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo.getDimensionEncodings(
- parsedCubeInfo.cubeDesc, dimensions, dictMap);
+ Map<TblColRef, Dictionary<String>> dictMap = fragmentDictionaryMaps
+ .get(FragmentId.parse(fragmentMetaInfo.getFragmentId()));
+ DimensionEncoding[] dimensionEncodings = ParsedStreamingCubeInfo
+ .getDimensionEncodings(parsedCubeInfo.cubeDesc, dimensions, dictMap);
FragmentCuboidReader fragmentCuboidReader = new FragmentCuboidReader(parsedCubeInfo.cubeDesc, fragmentData,
cuboidMetaInfo, cuboidInfo.getDimensions(), parsedCubeInfo.measureDescs, dimensionEncodings);
fragmentCuboidReaders.add(fragmentCuboidReader);
@@ -264,7 +264,8 @@ public class FragmentFilesMerger {
if (dict instanceof TrieDictionary) {
invertIndexColDescs[i] = new SeqIIColumnDescriptor(dim.getName(), dict.getMinId(), dict.getMaxId());
} else {
- invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding());
+ invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(),
+ encoding.getLengthOfEncoding());
}
} else {
invertIndexColDescs[i] = new FixLenIIColumnDescriptor(dim.getName(), encoding.getLengthOfEncoding());
@@ -282,8 +283,8 @@ public class FragmentFilesMerger {
for (int i = 0; i < metricDataWriters.length; i++) {
metricDataWriters[i] = new CuboidMetricDataWriter(cuboidId, parsedCubeInfo.measureDescs[i].getName(),
parsedCubeInfo.getMeasureTypeSerializer(i).maxLength());
- metricsEncodings[i] = ColumnarMetricsEncodingFactory.create(parsedCubeInfo.measureDescs[i].getFunction()
- .getReturnDataType());
+ metricsEncodings[i] = ColumnarMetricsEncodingFactory
+ .create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType());
}
FragmentCuboidDataMerger fragmentCuboidDataMerger = new FragmentCuboidDataMerger(cuboidInfo,
@@ -324,19 +325,29 @@ public class FragmentFilesMerger {
for (int i = 0; i < dimDataWriters.length; i++) {
DimensionEncoding encoding = mergedDimEncodings[i];
int dimFixLen = encoding.getLengthOfEncoding();
- InputStream dimInput = new BufferedInputStream(FileUtils.openInputStream(dimDataWriters[i].getOutputFile()));
+ InputStream dimInput = new BufferedInputStream(
+ FileUtils.openInputStream(dimDataWriters[i].getOutputFile()));
try {
DimensionMetaInfo dimensionMeta = new DimensionMetaInfo();
dimensionMeta.setName(dimensions[i].getName());
int startOffset = (int) fragmentDataOutput.getCount();
dimensionMeta.setStartOffset(startOffset);
- ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc.getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc,
- dimensions[i].getName(), encoding);
+ ColumnarStoreDimDesc cStoreDimDesc = ColumnarStoreDimDesc
+ .getDefaultCStoreDimDesc(parsedCubeInfo.cubeDesc, dimensions[i].getName(), encoding);
ColumnDataWriter columnDataWriter = cStoreDimDesc.getDimWriter(fragmentDataOutput, rowCnt);
for (int j = 0; j < rowCnt; j++) {
byte[] dimValue = new byte[dimFixLen];
- dimInput.read(dimValue);
+
+ int offset = 0;
+ int bytesRead;
+ while ((bytesRead = dimInput.read(dimValue, offset, dimValue.length - offset)) != -1) {
+ offset += bytesRead;
+ if (offset >= dimValue.length) {
+ break;
+ }
+ }
+
if (DimensionEncoding.isNull(dimValue, 0, dimValue.length)) {
dimensionMeta.setHasNull(true);
}
@@ -358,8 +369,8 @@ public class FragmentFilesMerger {
}
for (int i = 0; i < metricDataWriters.length; i++) {
- DataInputStream metricInput = new DataInputStream(new BufferedInputStream(
- FileUtils.openInputStream(metricDataWriters[i].getOutputFile())));
+ DataInputStream metricInput = new DataInputStream(
+ new BufferedInputStream(FileUtils.openInputStream(metricDataWriters[i].getOutputFile())));
try {
ColumnarMetricsEncoding metricsEncoding = ColumnarMetricsEncodingFactory
.create(parsedCubeInfo.measureDescs[i].getFunction().getReturnDataType());
@@ -373,7 +384,16 @@ public class FragmentFilesMerger {
for (int j = 0; j < rowCnt; j++) {
int metricLen = metricInput.readInt();
byte[] metricValue = new byte[metricLen];
- metricInput.read(metricValue);
+
+ int offset = 0;
+ int bytesRead;
+ while ((bytesRead = metricInput.read(metricValue, offset, metricValue.length - offset)) != -1) {
+ offset += bytesRead;
+ if (offset >= metricValue.length) {
+ break;
+ }
+ }
+
columnDataWriter.write(metricValue);
}
columnDataWriter.flush();
@@ -404,7 +424,8 @@ public class FragmentFilesMerger {
this.colName = colName;
this.tmpColDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + colName + ".data");
- this.output = new CountingOutputStream(new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile)));
+ this.output = new CountingOutputStream(
+ new BufferedOutputStream(FileUtils.openOutputStream(tmpColDataFile)));
}
public void write(byte[] value) throws IOException {
@@ -437,8 +458,8 @@ public class FragmentFilesMerger {
this.metricName = metricName;
this.maxValLen = maxValLen;
this.tmpMetricDataFile = new File(mergeWorkingDirectory, cuboidId + "-" + metricName + ".data");
- this.countingOutput = new CountingOutputStream(new BufferedOutputStream(
- FileUtils.openOutputStream(tmpMetricDataFile)));
+ this.countingOutput = new CountingOutputStream(
+ new BufferedOutputStream(FileUtils.openOutputStream(tmpMetricDataFile)));
this.output = new DataOutputStream(countingOutput);
}
@@ -534,9 +555,8 @@ public class FragmentFilesMerger {
enqueueFromFragment(currRecordEntry.getSecond());
boolean needAggregate = false;
boolean first = true;
- while ((!minHeap.isEmpty())
- && StringArrayComparator.INSTANCE.compare(currRecord.dimensions,
- minHeap.peek().getFirst().dimensions) == 0) {
+ while ((!minHeap.isEmpty()) && StringArrayComparator.INSTANCE.compare(currRecord.dimensions,
+ minHeap.peek().getFirst().dimensions) == 0) {
if (first) {
doAggregate(currRecord);
first = false;