You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2020/07/19 12:00:58 UTC

[carbondata] branch master updated: [CARBONDATA-3889] Cleanup code for carbondata-streaming module

This is an automated email from the ASF dual-hosted git repository.

manhua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 23e2760  [CARBONDATA-3889] Cleanup code for carbondata-streaming module
23e2760 is described below

commit 23e2760981e7674c88eae8319ec102d5b9adb544
Author: QiangCai <qi...@qq.com>
AuthorDate: Sat Jul 4 23:37:21 2020 +0800

    [CARBONDATA-3889] Cleanup code for carbondata-streaming module
    
    Why is this PR needed?
    need cleanup code in carbondata-streaming module
    
    What changes were proposed in this PR?
    Cleanup code in carbondata-streaming module
    
    Does this PR introduce any user interface change?
    No
    Yes. (please explain the change and update document)
    
    Is any new testcase added?
    No
    Yes
    
    This closes #3826
---
 streaming/pom.xml                                  |  19 +-
 .../streaming/CarbonStreamRecordWriter.java        |   6 +-
 .../carbondata/streaming/StreamBlockletWriter.java |  22 +--
 .../streaming/segment/StreamSegment.java           | 210 +++++++++------------
 .../streaming/parser/FieldConverter.scala          |   2 +-
 .../streaming/parser/RowStreamParserImp.scala      |  16 +-
 .../streaming/CarbonStreamOutputFormatTest.java    |  12 +-
 7 files changed, 132 insertions(+), 155 deletions(-)

diff --git a/streaming/pom.xml b/streaming/pom.xml
index 92cbed4..84fffea 100644
--- a/streaming/pom.xml
+++ b/streaming/pom.xml
@@ -1,3 +1,20 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
   <parent>
     <artifactId>carbondata-parent</artifactId>
@@ -95,7 +112,7 @@
         <groupId>org.apache.maven.plugins</groupId>
         <artifactId>maven-surefire-plugin</artifactId>
         <version>2.18</version>
-        <!-- Note config is repeated in scalatest config -->
+        <!-- Note config is repeated in scala test config -->
         <configuration>
           <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
           <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
index 3209e8d..8db1bd7 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/CarbonStreamRecordWriter.java
@@ -82,7 +82,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
   private RowParser rowParser;
   private BadRecordsLogger badRecordLogger;
   private RowConverter converter;
-  private CarbonRow currentRow = new CarbonRow(null);
+  private final CarbonRow currentRow = new CarbonRow(null);
 
   // encoder
   private DataField[] dataFields;
@@ -174,7 +174,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     if (carbonFile.exists()) {
       // if the file is existed, use the append api
       outputStream = FileFactory.getDataOutputStreamUsingAppend(filePath);
-      // get the compressor from the fileheader. In legacy store,
+      // get the compressor from the file header. In legacy store,
       // the compressor name is not set and it use snappy compressor
       FileHeader header = new CarbonHeaderReader(filePath).readHeader();
       if (header.isSetCompressor_name()) {
@@ -329,7 +329,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
     if (output.getRowIndex() == -1) {
       return;
     }
-    output.apppendBlocklet(outputStream);
+    output.appendBlocklet(outputStream);
     outputStream.flush();
     if (!isClosed) {
       batchMinMaxIndex = StreamSegment.mergeBlockletMinMax(
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
index 89bf7c5..0391525 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/StreamBlockletWriter.java
@@ -44,14 +44,14 @@ public class StreamBlockletWriter {
   private byte[] buffer;
   private int maxSize;
   private int maxRowNum;
-  private int rowSize;
+  private final int rowSize;
   private int count = 0;
   private int rowIndex = -1;
-  private Compressor compressor;
+  private final Compressor compressor;
 
-  private int dimCountWithoutComplex;
-  private int measureCount;
-  private DataType[] measureDataTypes;
+  private final int dimCountWithoutComplex;
+  private final int measureCount;
+  private final DataType[] measureDataTypes;
 
   // blocklet level stats
   ColumnPageStatsCollector[] dimStatsCollectors;
@@ -93,11 +93,11 @@ public class StreamBlockletWriter {
   }
 
   private void ensureCapacity(int space) {
-    int newcount = space + count;
-    if (newcount > buffer.length) {
-      byte[] newbuf = new byte[Math.max(newcount, buffer.length + rowSize)];
-      System.arraycopy(buffer, 0, newbuf, 0, count);
-      buffer = newbuf;
+    int newCount = space + count;
+    if (newCount > buffer.length) {
+      byte[] newBuffer = new byte[Math.max(newCount, buffer.length + rowSize)];
+      System.arraycopy(buffer, 0, newBuffer, 0, count);
+      buffer = newBuffer;
     }
   }
 
@@ -212,7 +212,7 @@ public class StreamBlockletWriter {
     return blockletMinMaxIndex;
   }
 
-  void apppendBlocklet(DataOutputStream outputStream) throws IOException {
+  void appendBlocklet(DataOutputStream outputStream) throws IOException {
     outputStream.write(CarbonStreamOutputFormat.CARBON_SYNC_MARKER);
 
     BlockletInfo blockletInfo = new BlockletInfo();
diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
index c40698a..1a2ad0b 100644
--- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
+++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
@@ -94,22 +93,7 @@ public class StreamSegment {
           }
         }
         if (null == streamSegment) {
-          int segmentId = SegmentStatusManager.createNewSegmentId(details);
-          LoadMetadataDetails newDetail = new LoadMetadataDetails();
-          newDetail.setLoadName("" + segmentId);
-          newDetail.setFileFormat(FileFormat.ROW_V1);
-          newDetail.setLoadStartTime(System.currentTimeMillis());
-          newDetail.setSegmentStatus(SegmentStatus.STREAMING);
-
-          LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
-          int i = 0;
-          for (; i < details.length; i++) {
-            newDetails[i] = details[i];
-          }
-          newDetails[i] = newDetail;
-          SegmentStatusManager.writeLoadDetailsIntoFile(
-              CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails);
-          return newDetail.getLoadName();
+          return createNewSegment(table, details);
         } else {
           return streamSegment.getLoadName();
         }
@@ -131,6 +115,27 @@ public class StreamSegment {
     }
   }
 
+  private static String createNewSegment(CarbonTable table, LoadMetadataDetails[] details)
+      throws IOException {
+    int segmentId = SegmentStatusManager.createNewSegmentId(details);
+    LoadMetadataDetails newDetail = new LoadMetadataDetails();
+    newDetail.setLoadName(String.valueOf(segmentId));
+    newDetail.setFileFormat(FileFormat.ROW_V1);
+    newDetail.setLoadStartTime(System.currentTimeMillis());
+    newDetail.setSegmentStatus(SegmentStatus.STREAMING);
+
+    LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
+    int i = 0;
+    for (; i < details.length; i++) {
+      newDetails[i] = details[i];
+    }
+    newDetails[i] = newDetail;
+    SegmentStatusManager
+        .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(table.getTablePath()),
+            newDetails);
+    return newDetail.getLoadName();
+  }
+
   /**
    * marker old stream segment to finished status and create new stream segment
    */
@@ -155,38 +160,21 @@ public class StreamSegment {
             break;
           }
         }
-
-        int newSegmentId = SegmentStatusManager.createNewSegmentId(details);
-        LoadMetadataDetails newDetail = new LoadMetadataDetails();
-        newDetail.setLoadName(String.valueOf(newSegmentId));
-        newDetail.setFileFormat(FileFormat.ROW_V1);
-        newDetail.setLoadStartTime(System.currentTimeMillis());
-        newDetail.setSegmentStatus(SegmentStatus.STREAMING);
-
-        LoadMetadataDetails[] newDetails = new LoadMetadataDetails[details.length + 1];
-        int i = 0;
-        for (; i < details.length; i++) {
-          newDetails[i] = details[i];
-        }
-        newDetails[i] = newDetail;
-        SegmentStatusManager
-            .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(
-                table.getTablePath()), newDetails);
-        return newDetail.getLoadName();
+        return createNewSegment(table, details);
       } else {
         LOGGER.error(
-            "Not able to acquire the lock for stream table status updation for table " + table
+            "Not able to acquire the status update lock for streaming table " + table
                 .getDatabaseName() + "." + table.getTableName());
         throw new IOException("Failed to get stream segment");
       }
     } finally {
       if (carbonLock.unlock()) {
         LOGGER.info(
-            "Table unlocked successfully after table status updation" + table.getDatabaseName()
+            "Table unlocked successfully after table status update" + table.getDatabaseName()
                 + "." + table.getTableName());
       } else {
         LOGGER.error("Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-            .getTableName() + " during table status updation");
+            .getTableName() + " during table status update");
       }
     }
   }
@@ -223,11 +211,11 @@ public class StreamSegment {
       }
     } finally {
       if (statusLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation"
+        LOGGER.info("Table unlocked successfully after table status update"
             + carbonTable.getDatabaseName() + "." + carbonTable.getTableName());
       } else {
         LOGGER.error("Unable to unlock Table lock for table " + carbonTable.getDatabaseName()
-            + "." + carbonTable.getTableName() + " during table status updation");
+            + "." + carbonTable.getTableName() + " during table status update");
       }
     }
   }
@@ -268,9 +256,7 @@ public class StreamSegment {
    */
   private static StreamFileIndex createStreamBlockIndex(String fileName,
       BlockletMinMaxIndex minMaxIndex, int blockletRowCount) {
-    StreamFileIndex streamFileIndex =
-        new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
-    return streamFileIndex;
+    return new StreamFileIndex(fileName, minMaxIndex, blockletRowCount);
   }
 
   /**
@@ -400,12 +386,7 @@ public class StreamSegment {
   public static CarbonFile[] listDataFiles(String segmentDir) {
     CarbonFile carbonDir = FileFactory.getCarbonFile(segmentDir);
     if (carbonDir.exists()) {
-      return carbonDir.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          return CarbonTablePath.isCarbonDataFile(file.getName());
-        }
-      });
+      return carbonDir.listFiles(file -> CarbonTablePath.isCarbonDataFile(file.getName()));
     } else {
       return new CarbonFile[0];
     }
@@ -415,9 +396,8 @@ public class StreamSegment {
    * read index file to list BlockIndex
    *
    * @param indexPath path of the index file
-   * @param fileType  file type of the index file
    * @return the list of BlockIndex in the index file
-   * @throws IOException
+   * @throws IOException failed to read index file
    */
   public static List<BlockIndex> readIndexFile(String indexPath)
       throws IOException {
@@ -460,10 +440,7 @@ public class StreamSegment {
       return;
     }
 
-    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
-    for (int index = 0; index < comparators.length; index++) {
-      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
-    }
+    SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
 
     // min value
     byte[][] minValues = minMaxIndex.getMinValues();
@@ -475,23 +452,7 @@ public class StreamSegment {
       if (minValues.length != mergedMinValues.length) {
         throw new IOException("the lengths of the min values should be same.");
       }
-      int dimCount = minValues.length - msrDataTypes.length;
-      for (int index = 0; index < minValues.length; index++) {
-        if (index < dimCount) {
-          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
-              > 0) {
-            minValues[index] = mergedMinValues[index];
-          }
-        } else {
-          Object object = DataTypeUtil.getMeasureObjectFromDataType(
-              minValues[index], msrDataTypes[index - dimCount]);
-          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
-              mergedMinValues[index], msrDataTypes[index - dimCount]);
-          if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
-            minValues[index] = mergedMinValues[index];
-          }
-        }
-      }
+      mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
     }
 
     // max value
@@ -503,21 +464,55 @@ public class StreamSegment {
       if (maxValues.length != mergedMaxValues.length) {
         throw new IOException("the lengths of the max values should be same.");
       }
-      int dimCount = maxValues.length - msrDataTypes.length;
-      for (int index = 0; index < maxValues.length; index++) {
-        if (index < dimCount) {
-          if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
-              < 0) {
-            maxValues[index] = mergedMaxValues[index];
-          }
-        } else {
-          Object object = DataTypeUtil.getMeasureObjectFromDataType(
-              maxValues[index], msrDataTypes[index - dimCount]);
-          Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
-              mergedMaxValues[index], msrDataTypes[index - dimCount]);
-          if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
-            maxValues[index] = mergedMaxValues[index];
-          }
+      mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
+    }
+  }
+
+  private static SerializableComparator[] getSerializableComparators(DataType[] msrDataTypes) {
+    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
+    for (int index = 0; index < comparators.length; index++) {
+      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
+    }
+    return comparators;
+  }
+
+  private static void mergeMaxValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
+      byte[][] maxValues, byte[][] mergedMaxValues) {
+    int dimCount = maxValues.length - msrDataTypes.length;
+    for (int index = 0; index < maxValues.length; index++) {
+      if (index < dimCount) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
+            < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil
+            .getMeasureObjectFromDataType(maxValues[index], msrDataTypes[index - dimCount]);
+        Object mergedObject = DataTypeUtil
+            .getMeasureObjectFromDataType(mergedMaxValues[index], msrDataTypes[index - dimCount]);
+        if (comparators[index - dimCount].compare(object, mergedObject) < 0) {
+          maxValues[index] = mergedMaxValues[index];
+        }
+      }
+    }
+  }
+
+  private static void mergeMinValues(DataType[] msrDataTypes, SerializableComparator[] comparators,
+      byte[][] minValues, byte[][] mergedMinValues) {
+    int dimCount = minValues.length - msrDataTypes.length;
+    for (int index = 0; index < minValues.length; index++) {
+      if (index < dimCount) {
+        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
+            > 0) {
+          minValues[index] = mergedMinValues[index];
+        }
+      } else {
+        Object object = DataTypeUtil
+            .getMeasureObjectFromDataType(minValues[index], msrDataTypes[index - dimCount]);
+        Object mergedObject = DataTypeUtil
+            .getMeasureObjectFromDataType(mergedMinValues[index], msrDataTypes[index - dimCount]);
+        if (comparators[index - dimCount].compare(object, mergedObject) > 0) {
+          minValues[index] = mergedMinValues[index];
         }
       }
     }
@@ -535,52 +530,17 @@ public class StreamSegment {
       return to;
     }
 
-    SerializableComparator[] comparators = new SerializableComparator[msrDataTypes.length];
-    for (int index = 0; index < comparators.length; index++) {
-      comparators[index] = Comparator.getComparatorByDataTypeForMeasure(msrDataTypes[index]);
-    }
+    SerializableComparator[] comparators = getSerializableComparators(msrDataTypes);
 
     // min value
     byte[][] minValues = to.getMinValues();
     byte[][] mergedMinValues = from.getMinValues();
-    int dimCount1 = minValues.length - msrDataTypes.length;
-    for (int index = 0; index < minValues.length; index++) {
-      if (index < dimCount1) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(minValues[index], mergedMinValues[index])
-            > 0) {
-          minValues[index] = mergedMinValues[index];
-        }
-      } else {
-        Object object = DataTypeUtil.getMeasureObjectFromDataType(
-            minValues[index], msrDataTypes[index - dimCount1]);
-        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
-            mergedMinValues[index], msrDataTypes[index - dimCount1]);
-        if (comparators[index - dimCount1].compare(object, mergedObject) > 0) {
-          minValues[index] = mergedMinValues[index];
-        }
-      }
-    }
+    mergeMinValues(msrDataTypes, comparators, minValues, mergedMinValues);
 
     // max value
     byte[][] maxValues = to.getMaxValues();
     byte[][] mergedMaxValues = from.getMaxValues();
-    int dimCount2 = maxValues.length - msrDataTypes.length;
-    for (int index = 0; index < maxValues.length; index++) {
-      if (index < dimCount2) {
-        if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(maxValues[index], mergedMaxValues[index])
-            < 0) {
-          maxValues[index] = mergedMaxValues[index];
-        }
-      } else {
-        Object object = DataTypeUtil.getMeasureObjectFromDataType(
-            maxValues[index], msrDataTypes[index - dimCount2]);
-        Object mergedObject = DataTypeUtil.getMeasureObjectFromDataType(
-            mergedMaxValues[index], msrDataTypes[index - dimCount2]);
-        if (comparators[index - dimCount2].compare(object, mergedObject) < 0) {
-          maxValues[index] = mergedMaxValues[index];
-        }
-      }
-    }
+    mergeMaxValues(msrDataTypes, comparators, maxValues, mergedMaxValues);
     return to;
   }
 
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 9393773..ef3853c 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -25,7 +25,7 @@ import java.util.Base64
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 object FieldConverter {
-  val stringLengthExceedErrorMsg = "Dataload failed, String length cannot exceed "
+  val stringLengthExceedErrorMsg = "Data load failed, String length cannot exceed "
 
   /**
    * Return a String representation of the input value
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
index 03ca09e..21ce13f 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/RowStreamParserImp.scala
@@ -35,15 +35,15 @@ import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConst
  */
 class RowStreamParserImp extends CarbonStreamParser {
 
-  var configuration: Configuration = null
-  var isVarcharTypeMapping: Array[Boolean] = null
-  var structType: StructType = null
-  var encoder: ExpressionEncoder[Row] = null
+  var configuration: Configuration = _
+  var isVarcharTypeMapping: Array[Boolean] = _
+  var structType: StructType = _
+  var encoder: ExpressionEncoder[Row] = _
 
-  var timeStampFormat: SimpleDateFormat = null
-  var dateFormat: SimpleDateFormat = null
-  var complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
-  var serializationNullFormat: String = null
+  var timeStampFormat: SimpleDateFormat = _
+  var dateFormat: SimpleDateFormat = _
+  val complexDelimiters: util.ArrayList[String] = new util.ArrayList[String]()
+  var serializationNullFormat: String = _
 
   override def initialize(configuration: Configuration,
       structType: StructType, isVarcharTypeMapping: Array[Boolean]): Unit = {
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
index 17602f0..d0e5c95 100644
--- a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
@@ -82,7 +82,7 @@ public class CarbonStreamOutputFormatTest extends TestCase {
     try {
       CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
     } catch (IOException e) {
-      Assert.assertTrue("Failed to config CarbonLoadModel for CarbonStreamOutputFromat", false);
+      Assert.fail("Failed to config CarbonLoadModel for CarbonStreamOutputFormat");
     }
   }
 
@@ -92,11 +92,11 @@ public class CarbonStreamOutputFormatTest extends TestCase {
       CarbonLoadModel model = CarbonStreamOutputFormat.getCarbonLoadModel(hadoopConf);
 
       Assert.assertNotNull("Failed to get CarbonLoadModel", model);
-      Assert.assertTrue("CarbonLoadModel should be same with previous",
-          carbonLoadModel.getFactTimeStamp() == model.getFactTimeStamp());
+      Assert.assertEquals("CarbonLoadModel should be same with previous",
+          carbonLoadModel.getFactTimeStamp(), model.getFactTimeStamp());
 
     } catch (IOException e) {
-      Assert.assertTrue("Failed to get CarbonLoadModel for CarbonStreamOutputFromat", false);
+      Assert.fail("Failed to get CarbonLoadModel for CarbonStreamOutputFormat");
     }
   }
 
@@ -106,11 +106,11 @@ public class CarbonStreamOutputFormatTest extends TestCase {
       CarbonStreamOutputFormat.setCarbonLoadModel(hadoopConf, carbonLoadModel);
       TaskAttemptContext taskAttemptContext =
           new TaskAttemptContextImpl(hadoopConf, taskAttemptId);
-      RecordWriter recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
+      RecordWriter<Void, Object> recordWriter = outputFormat.getRecordWriter(taskAttemptContext);
       Assert.assertNotNull("Failed to get CarbonStreamRecordWriter", recordWriter);
     } catch (Exception e) {
       e.printStackTrace();
-      Assert.assertTrue(e.getMessage(), false);
+      Assert.fail(e.getMessage());
     }
   }