You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2020/07/19 12:00:58 UTC
[carbondata] branch master updated: [CARBONDATA-3889] Cleanup code
for carbondata-streaming module
This is an automated email from the ASF dual-hosted git repository.
manhua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 23e2760 [CARBONDATA-3889] Cleanup code for carbondata-streaming module
23e2760 is described below
commit 23e2760981e7674c88eae8319ec102d5b9adb544
Author: QiangCai <qi...@qq.com>
AuthorDate: Sat Jul 4 23:37:21 2020 +0800
[CARBONDATA-3889] Cleanup code for carbondata-streaming module
Why is this PR needed?
need cleanup code in carbondata-streaming module
What changes were proposed in this PR?
Cleanup code in carbondata-streaming module
Does this PR introduce any user interface change?
No
Yes. (please explain the change and update document)
Is any new testcase added?
No
Yes
This closes #3826
---
streaming/pom.xml | 19 +-
.../streaming/CarbonStreamRecordWriter.java | 6 +-
.../carbondata/streaming/StreamBlockletWriter.java | 22 +--
.../streaming/segment/StreamSegment.java | 210 +++++++++------------
.../streaming/parser/FieldConverter.scala | 2 +-
.../streaming/parser/RowStreamParserImp.scala | 16 +-
.../streaming/CarbonStreamOutputFormatTest.java | 12 +-
7 files changed, 132 insertions(+), 155 deletions(-)
diff --git a/streaming/pom.xml b/streaming/pom.xml
index 92cbed4..84fffea 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>carbondata-parent</artifactId>
@@ -95,7 +112,7 @@
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18</version>
- <!-- Note config is repeated in scalatest config -->
+ <!-- Note config is repeated in scala test config -->
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 3209e8d..8db1bd7 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -82,7 +82,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
private RowParser rowParser;
private BadRecordsLogger badRecordLogger;
private RowConverter converter;
- private CarbonRow currentRow = new CarbonRow(null);
+ private final CarbonRow currentRow = new CarbonRow(null);
// encoder
private DataField[] dataFields;
@@ -174,7 +174,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
if (carbonFile.exists()) {
// if the file is existed, use the append api
outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath);
- // get the compressor from the fileheader. In legacy store,
+ // get the compressor from the file header. In legacy store,
// the compressor name is not set and it use snappy compressor
FileHeader header = new CarbonHeaderReader(filePath).readHeader();
if (header.isSetCompressor_name()) {
@@ -329,7 +329,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
if (output.getRowIndex() == -1) {
return;
}
- output.apppendBlocklet(outputStream);
+ output.appendBlocklet(outputStream);
outputStream.flush();
if (!isClosed) {
batchMinMaxIndex = StreamSegment.mergeBlockletMinMax(
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 89bf7c5..0391525 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -44,14 +44,14 @@ public class StreamBlockletWriter {
private byte[] buffer;
private int maxSize;
private int maxRowNum;
- private int rowSize;
+ private final int rowSize;
private int count = 0;
private int rowIndex = -1;
- private Compressor compressor;
+ private final Compressor compressor;
- private int dimCountWithoutComplex;
- private int measureCount;
- private DataType[] measureDataTypes;
+ private final int dimCountWithoutComplex;
+ private final int measureCount;
+ private final DataType[] measureDataTypes;
// blocklet level stats
ColumnPageStatsCollector[] dimStatsCollectors;
@@ -93,11 +93,11 @@ public class StreamBlockletWriter {
}
private void ensureCapacity(int space) {
- int newcount = space + count;
- if (newcount > buffer.length) {
- byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
- System.arraycopy(buffer, 0, newbuf, 0, count);
- buffer = newbuf;
+ int newCount = space + count;
+ if (newCount > buffer.length) {
+ byte[] newBuffer = new byte[Math.max(newCount, buffer.length + rowSize)];
+ System.arraycopy(buffer, 0, newBuffer, 0, count);
+ buffer = newBuffer;
}
}
@@ -212,7 +212,7 @@ public class StreamBlockletWriter {
return blockletMinMaxIndex;
}
- void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+ void appendBlocklet(DataOutputStream outputStream) throws IOException {
outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
BlockletInfo blockletInfo = new BlockletInfo();
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index c40698a..1a2ad0b 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.common.CarbonIterator;
import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.constants.CarbonCommonConstants;
import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
import org.apache.carbondata.core.datastore.impl.FileFactory;
import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
import org.apache.carbondata.core.locks.CarbonLockFactory;
@@ -94,22 +93,7 @@ public class StreamSegment {
}
}
if (null == streamSegment) {
- int segmentId = SegmentStatusManager.createNewSegmentId(details);
- LoadMetadataDetails newDetail = new LoadMetadataDetails();
- newDetail.setLoadName("" + segmentId);
- newDetail.setFileFormat(FileFormat.ROW_V1);
- newDetail.setLoadStartTime(System.currentTimeMillis());
- newDetail.setSegmentStatus(SegmentStatus.STREAMING);
-
- LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
- int i = 0;
- for (; i < details.length; i++) {
- newDetails[i] = details[i];
- }
- newDetails[i] = newDetail;
- SegmentStatusManager.writeLoadDetailsIntoFile(
- CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
- return newDetail.getLoadName();
+ return createNewSegment(table, details);
} else {
return streamSegment.getLoadName();
}
@@ -131,6 +115,27 @@ public class StreamSegment {
}
}
+ private static String createNewSegment(CarbonTable table, LoadMetadataDetails[] details)
+ throws IOException {
+ int segmentId = SegmentStatusManager.createNewSegmentId(details);
+ LoadMetadataDetails newDetail = new LoadMetadataDetails();
+ newDetail.setLoadName(String.valueOf(segmentId));
+ newDetail.setFileFormat(FileFormat.ROW_V1);
+ newDetail.setLoadStartTime(System.currentTimeMillis());
+ newDetail.setSegmentStatus(SegmentStatus.STREAMING);
+
+ LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
+ int i = 0;
+ for (; i < details.length; i++) {
+ newDetails[i] = details[i];
+ }
+ newDetails[i] = newDetail;
+ SegmentStatusManager
+ .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath()),
+ newDetails);
+ return newDetail.getLoadName();
+ }
+
/**
* marker old stream segment to finished status and create new stream segment
*/
@@ -155,38 +160,21 @@ public class StreamSegment {
break;
}
}
-
- int newSegmentId = SegmentStatusManager.createNewSegmentId(details);
- LoadMetadataDetails newDetail = new LoadMetadataDetails();
- newDetail.setLoadName(String.valueOf(newSegmentId));
- newDetail.setFileFormat(FileFormat.ROW_V1);
- newDetail.setLoadStartTime(System.currentTimeMillis());
- newDetail.setSegmentStatus(SegmentStatus.STREAMING);
-
- LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
- int i = 0;
- for (; i < details.length; i++) {
- newDetails[i] = details[i];
- }
- newDetails[i] = newDetail;
- SegmentStatusManager
- .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
- table.getTablePath()), newDetails);
- return newDetail.getLoadName();
+ return createNewSegment(table, details);
} else {
LOGGER.error(
- "Not able to acquire the lock for stream table status updation for table " + table
+ "Not able to acquire the status update lock for streaming table " + table
.getDatabaseName() + "." + table.getTableName());
throw new IOException("Failed to get stream segment");
}
} finally {
if (carbonLock.unlock()) {
LOGGER.info(
- "Table unlocked successfully after table status updation" + table.getDatabaseName()
+ "Table unlocked successfully after table status update" + table.getDatabaseName()
+ "." + table.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
- .getTableName() + " during table status updation");
+ .getTableName() + " during table status update");
}
}
}
@@ -223,11 +211,11 @@ public class StreamSegment {
}
} finally {
if (statusLock.unlock()) {
- LOGGER.info("Table unlocked successfully after table status updation"
+ LOGGER.info("Table unlocked successfully after table status update"
+ carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
} else {
LOGGER.error("Unable to unlock Table lock for table " + carbonTable.getDatabaseName()
- + "." + carbonTable.getTableName() + " during table status updation");
+ + "." + carbonTable.getTableName() + " during table status update");
}
}
}
@@ -268,9 +256,7 @@ public class StreamSegment {
*/
private static StreamFileIndex createStreamBlockIndex(String fileName,
BlockletMinMaxIndex minMaxIndex, int blockletRowCount) {
- StreamFileIndex streamFileIndex =
- new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
- return streamFileIndex;
+ return new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
}
/**
@@ -400,12 +386,7 @@ public class StreamSegment {
public static CarbonFile[] listDataFiles(String segmentDir) {
CarbonFile carbonDir = FileFactory.getCarbonFile(segmentDir);
if (carbonDir.exists()) {
- return carbonDir.listFiles(new CarbonFileFilter() {
- @Override
- public boolean accept(CarbonFile file) {
- return CarbonTablePath.isCarbonDataFile(file.getName());
- }
- });
+ return carbonDir.listFiles(file -> CarbonTablePath.isCarbonDataFile(file.getName()));
} else {
return new CarbonFile[0];
}
@@ -415,9 +396,8 @@ public class StreamSegment {
* read index file to list BlockIndex
*
* @param indexPath path of the index file
- * @param fileType file type of the index file
* @return the list of BlockIndex in the index file
- * @throws IOException
+ * @throws IOException failed to read index file
*/
public static List<BlockIndex> readIndexFile(String indexPath)
throws IOException {
@@ -460,10 +440,7 @@ public class StreamSegment {
return;
}
- SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
- for (int index = 0; index < comparators.length; index++) {
- comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
- }
+ SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
// min value
byte[][] minValues = minMaxIndex.getMinValues();
@@ -475,23 +452,7 @@ public class StreamSegment {
if (minValues.length != mergedMinValues.length) {
throw new IOException("the lengths of the min values should be same.");
}
- int dimCount = minValues.length - msrDataTypes.length;
- for (int index = 0; index < minValues.length; index++) {
- if (index < dimCount) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
- > 0) {
- minValues[index] = mergedMinValues[index];
- }
- } else {
- Object object = DataTypeUtil.getMeasureObjectFromDataType(
- minValues[index], msrDataTypes[index - dimCount]);
- Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
- mergedMinValues[index], msrDataTypes[index - dimCount]);
- if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
- minValues[index] = mergedMinValues[index];
- }
- }
- }
+ mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
}
// max value
@@ -503,21 +464,55 @@ public class StreamSegment {
if (maxValues.length != mergedMaxValues.length) {
throw new IOException("the lengths of the max values should be same.");
}
- int dimCount = maxValues.length - msrDataTypes.length;
- for (int index = 0; index < maxValues.length; index++) {
- if (index < dimCount) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
- < 0) {
- maxValues[index] = mergedMaxValues[index];
- }
- } else {
- Object object = DataTypeUtil.getMeasureObjectFromDataType(
- maxValues[index], msrDataTypes[index - dimCount]);
- Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
- mergedMaxValues[index], msrDataTypes[index - dimCount]);
- if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
- maxValues[index] = mergedMaxValues[index];
- }
+ mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
+ }
+ }
+
+ private static SerializableComparator[] getSerializableComparators(DataType[] msrDataTypes) {
+ SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+ for (int index = 0; index < comparators.length; index++) {
+ comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+ }
+ return comparators;
+ }
+
+ private static void mergeMaxValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
+ byte[][] maxValues, byte[][] mergedMaxValues) {
+ int dimCount = maxValues.length - msrDataTypes.length;
+ for (int index = 0; index < maxValues.length; index++) {
+ if (index < dimCount) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+ < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil
+ .getMeasureObjectFromDataType(maxValues[index], msrDataTypes[index - dimCount]);
+ Object mergedObject = DataTypeUtil
+ .getMeasureObjectFromDataType(mergedMaxValues[index], msrDataTypes[index - dimCount]);
+ if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
+ maxValues[index] = mergedMaxValues[index];
+ }
+ }
+ }
+ }
+
+ private static void mergeMinValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
+ byte[][] minValues, byte[][] mergedMinValues) {
+ int dimCount = minValues.length - msrDataTypes.length;
+ for (int index = 0; index < minValues.length; index++) {
+ if (index < dimCount) {
+ if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+ > 0) {
+ minValues[index] = mergedMinValues[index];
+ }
+ } else {
+ Object object = DataTypeUtil
+ .getMeasureObjectFromDataType(minValues[index], msrDataTypes[index - dimCount]);
+ Object mergedObject = DataTypeUtil
+ .getMeasureObjectFromDataType(mergedMinValues[index], msrDataTypes[index - dimCount]);
+ if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
+ minValues[index] = mergedMinValues[index];
}
}
}
@@ -535,52 +530,17 @@ public class StreamSegment {
return to;
}
- SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
- for (int index = 0; index < comparators.length; index++) {
- comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
- }
+ SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
// min value
byte[][] minValues = to.getMinValues();
byte[][] mergedMinValues = from.getMinValues();
- int dimCount1 = minValues.length - msrDataTypes.length;
- for (int index = 0; index < minValues.length; index++) {
- if (index < dimCount1) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
- > 0) {
- minValues[index] = mergedMinValues[index];
- }
- } else {
- Object object = DataTypeUtil.getMeasureObjectFromDataType(
- minValues[index], msrDataTypes[index - dimCount1]);
- Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
- mergedMinValues[index], msrDataTypes[index - dimCount1]);
- if (comparators[index - dimCount1].compare(object, mergedObject) > 0) {
- minValues[index] = mergedMinValues[index];
- }
- }
- }
+ mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
// max value
byte[][] maxValues = to.getMaxValues();
byte[][] mergedMaxValues = from.getMaxValues();
- int dimCount2 = maxValues.length - msrDataTypes.length;
- for (int index = 0; index < maxValues.length; index++) {
- if (index < dimCount2) {
- if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
- < 0) {
- maxValues[index] = mergedMaxValues[index];
- }
- } else {
- Object object = DataTypeUtil.getMeasureObjectFromDataType(
- maxValues[index], msrDataTypes[index - dimCount2]);
- Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
- mergedMaxValues[index], msrDataTypes[index - dimCount2]);
- if (comparators[index - dimCount2].compare(object, mergedObject) < 0) {
- maxValues[index] = mergedMaxValues[index];
- }
- }
- }
+ mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
return to;
}
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 9393773..ef3853c 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -25,7 +25,7 @@ import java.util.Base64
import org.apache.carbondata.core.constants.CarbonCommonConstants
object FieldConverter {
- val stringLengthExceedErrorMsg = "Dataload failed, String length cannot exceed "
+ val stringLengthExceedErrorMsg = "Data load failed, String length cannot exceed "
/**
* Return a String representation of the input value
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 03ca09e..21ce13f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -35,15 +35,15 @@ import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConst
*/
class RowStreamParserImp extends CarbonStreamParser {
- var configuration: Configuration = null
- var isVarcharTypeMapping: Array[Boolean] = null
- var structType: StructType = null
- var encoder: ExpressionEncoder[Row] = null
+ var configuration: Configuration = _
+ var isVarcharTypeMapping: Array[Boolean] = _
+ var structType: StructType = _
+ var encoder: ExpressionEncoder[Row] = _
- var timeStampFormat: SimpleDateFormat = null
- var dateFormat: SimpleDateFormat = null
- var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
- var serializationNullFormat: String = null
+ var timeStampFormat: SimpleDateFormat = _
+ var dateFormat: SimpleDateFormat = _
+ val complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
+ var serializationNullFormat: String = _
override def initialize(configuration: Configuration,
structType: StructType, isVarcharTypeMapping: Array[Boolean]): Unit = {
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
index 17602f0..d0e5c95 100644
--- a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
@@ -82,7 +82,7 @@ public class CarbonStreamOutputFormatTest extends TestCase {
try {
CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
} catch (IOException e) {
- Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+ Assert.fail("Failed to config CarbonLoadModel for CarbonStreamOutputFormat");
}
}
@@ -92,11 +92,11 @@ public class CarbonStreamOutputFormatTest extends TestCase {
CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
Assert.assertNotNull("Failed to get CarbonLoadModel", model);
- Assert.assertTrue("CarbonLoadModel should be same with previous",
- carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+ Assert.assertEquals("CarbonLoadModel should be same with previous",
+ carbonLoadModel.getFactTimeStamp(), model.getFactTimeStamp());
} catch (IOException e) {
- Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+ Assert.fail("Failed to get CarbonLoadModel for CarbonStreamOutputFormat");
}
}
@@ -106,11 +106,11 @@ public class CarbonStreamOutputFormatTest extends TestCase {
CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
TaskAttemptContext taskAttemptContext =
new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
- RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+ RecordWriter<Void, Object> recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
} catch (Exception e) {
e.printStackTrace();
- Assert.assertTrue(e.getMessage(), false);
+ Assert.fail(e.getMessage());
}
}