You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:28 UTC

[30/47] incubator-carbondata git commit: [CARBONDATA-116]ignore the major compacted segment for minor compaction (#880)

[CARBONDATA-116]ignore the major compacted segment for minor compaction (#880)

Once a segment is formed using major compaction the same segment can not be considered for minor compaction. this PR will skip the same for an effective compaction.

* for correcting test case.

* for test case  correction.

* correcting test cases.

* correcting test cases.

* correcting test cases.

* fixing review comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bdc1321d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bdc1321d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bdc1321d

Branch: refs/heads/master
Commit: bdc1321d9c3f42c6bd56f6bf787cfba5bf3fb6b9
Parents: 4d2f684
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Jul 28 19:39:01 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Thu Jul 28 19:39:01 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   5 +
 .../core/load/LoadMetadataDetails.java          |  20 +++
 .../spark/merger/CompactionCallable.java        |  36 +----
 .../spark/merger/CarbonDataMergerUtil.java      |  11 +-
 .../execution/command/carbonTableSchema.scala   |   6 +
 .../spark/rdd/CarbonDataRDDFactory.scala        |  15 ++-
 .../org/carbondata/spark/rdd/Compactor.scala    |  30 ++---
 .../MajorCompactionIgnoreInMinorTest.scala      | 130 +++++++++++++++++++
 .../lcm/status/SegmentStatusManager.java        |   2 -
 9 files changed, 201 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 739462e..5852241 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -844,6 +844,11 @@ public final class CarbonCommonConstants {
    */
   public static final int COMPACTION_INMEMORY_RECORD_SIZE = 120000;
 
+  /**
+   * If the level 2 compaction is done in minor then new compacted segment will end with .2
+   */
+  public static String LEVEL2_COMPACTION_INDEX = ".2";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
index 0250b2e..f0b5ac9 100644
--- a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
@@ -56,6 +56,11 @@ public class LoadMetadataDetails implements Serializable {
    */
   private String visibility = "true";
 
+  /**
+   * To know if the segment is a major compacted segment or not.
+   */
+  private String majorCompacted;
+
   public String getPartitionCount() {
     return partitionCount;
   }
@@ -203,4 +208,19 @@ public class LoadMetadataDetails implements Serializable {
     this.visibility = visibility;
   }
 
+  /**
+   * Return true if it is a major compacted segment.
+   * @return
+   */
+  public String isMajorCompacted() {
+    return majorCompacted;
+  }
+
+  /**
+   * Set true if it is a major compacted segment.
+   * @param majorCompacted
+   */
+  public void setMajorCompacted(String majorCompacted) {
+    this.majorCompacted = majorCompacted;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
index 7289e61..b90d46e 100644
--- a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
+++ b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
@@ -17,51 +17,27 @@
 
 package org.carbondata.integration.spark.merger;
 
-import java.util.List;
 import java.util.concurrent.Callable;
 
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.carbondata.core.load.LoadMetadataDetails;
-import org.carbondata.spark.load.CarbonLoadModel;
 import org.carbondata.spark.rdd.Compactor;
 
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.execution.command.Partitioner;
+import org.apache.spark.sql.execution.command.CompactionCallableModel;
 
 /**
- *
+ * Callable class which is used to trigger the compaction in a separate callable.
  */
 public class CompactionCallable implements Callable<Void> {
 
-  private final String hdfsStoreLocation;
-  private final Partitioner partitioner;
-  private final String storeLocation;
-  private final CarbonTable carbonTable;
-  private final String kettleHomePath;
-  private final Long cubeCreationTime;
-  private final List<LoadMetadataDetails> loadsToMerge;
-  private final SQLContext sqlContext;
-  private final CarbonLoadModel carbonLoadModel;
+  private final CompactionCallableModel compactionCallableModel;
 
-  public CompactionCallable(String hdfsStoreLocation, CarbonLoadModel carbonLoadModel,
-      Partitioner partitioner, String storeLocation, CarbonTable carbonTable, String kettleHomePath,
-      Long cubeCreationTime, List<LoadMetadataDetails> loadsToMerge, SQLContext sqlContext) {
+  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
 
-    this.hdfsStoreLocation = hdfsStoreLocation;
-    this.carbonLoadModel = carbonLoadModel;
-    this.partitioner = partitioner;
-    this.storeLocation = storeLocation;
-    this.carbonTable = carbonTable;
-    this.kettleHomePath = kettleHomePath;
-    this.cubeCreationTime = cubeCreationTime;
-    this.loadsToMerge = loadsToMerge;
-    this.sqlContext = sqlContext;
+    this.compactionCallableModel = compactionCallableModel;
   }
 
   @Override public Void call() throws Exception {
 
-    Compactor.triggerCompaction(hdfsStoreLocation, carbonLoadModel, partitioner, storeLocation,
-        carbonTable, kettleHomePath, cubeCreationTime, loadsToMerge, sqlContext);
+    Compactor.triggerCompaction(compactionCallableModel);
     return null;
 
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index 3aa66c2..f033993 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -128,7 +128,7 @@ public final class CarbonDataMergerUtil {
 
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
-      String mergeLoadStartTime) {
+      String mergeLoadStartTime, CompactionType compactionType) {
 
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -176,6 +176,10 @@ public final class CarbonDataMergerUtil {
         loadMetadataDetails.setLoadName(mergedLoadNumber);
         loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
         loadMetadataDetails.setPartitionCount("0");
+        // if this is a major compaction then set the segment as major compaction.
+        if (compactionType == CompactionType.MAJOR_COMPACTION) {
+          loadMetadataDetails.setMajorCompacted("true");
+        }
 
         List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
 
@@ -514,7 +518,10 @@ public final class CarbonDataMergerUtil {
 
       // if a segment is already merged 2 levels then it s name will become .2
       // need to exclude those segments from minor compaction.
-      if (segName.endsWith(".2")) {
+      // if a segment is major compacted then should not be considered for minor.
+      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
+          segment.isMajorCompacted() != null && segment.isMajorCompacted()
+              .equalsIgnoreCase("true"))) {
         continue;
       }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 897759f..1dd066f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -48,6 +48,7 @@ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.datastorage.store.impl.FileFactory
+import org.carbondata.core.load.LoadMetadataDetails
 import org.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.carbondata.integration.spark.merger.CompactionType
@@ -158,6 +159,11 @@ case class CompactionModel(compactionSize: Long,
   carbonTable: CarbonTable,
   cubeCreationTime: Long)
 
+case class CompactionCallableModel(hdfsStoreLocation: String, carbonLoadModel: CarbonLoadModel,
+  partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
+  cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
+  compactionType: CompactionType)
+
 object TableNewProcessor {
   def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
     new TableNewProcessor(cm, sqlContext).process

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7726739..15de7bf 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileSplit
 import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv}
 import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.util.{FileUtils, SplitUtils}
 
@@ -476,7 +477,7 @@ object CarbonDataRDDFactory extends Logging {
             }
             )
 
-            val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
+            val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
               carbonLoadModel,
               partitioner,
               storeLocation,
@@ -484,9 +485,13 @@ object CarbonDataRDDFactory extends Logging {
               kettleHomePath,
               compactionModel.cubeCreationTime,
               loadsToMerge,
-              sqlContext
-            )
-            )
+              sqlContext,
+              compactionModel.compactionType)
+
+            val future: Future[Void] = executor
+              .submit(new CompactionCallable(compactionCallableModel
+              )
+              )
             futureList.add(future)
             segList = CarbonDataMergerUtil
               .filterOutAlreadyMergedSegments(segList, loadsToMerge)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
index 2680d7d..7e19ded 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
@@ -19,17 +19,14 @@ package org.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, Partitioner}
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
 
 import org.carbondata.common.logging.LogServiceFactory
 import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
 import org.carbondata.core.constants.CarbonCommonConstants
-import org.carbondata.core.load.LoadMetadataDetails
 import org.carbondata.core.util.CarbonProperties
 import org.carbondata.lcm.status.SegmentStatusManager
-import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.carbondata.spark.load.CarbonLoaderUtil
 import org.carbondata.spark.MergeResultImpl
 import org.carbondata.spark.merger.CarbonDataMergerUtil
 
@@ -40,15 +37,18 @@ object Compactor {
 
   val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
 
-  def triggerCompaction(hdfsStoreLocation: String,
-    carbonLoadModel: CarbonLoadModel,
-    partitioner: Partitioner,
-    storeLocation: String,
-    carbonTable: CarbonTable,
-    kettleHomePath: String,
-    cubeCreationTime: Long,
-    loadsToMerge: java.util.List[LoadMetadataDetails],
-    sc: SQLContext): Unit = {
+  def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+
+    val hdfsStoreLocation = compactionCallableModel.hdfsStoreLocation
+    val partitioner = compactionCallableModel.partitioner
+    val storeLocation = compactionCallableModel.storeLocation
+    val carbonTable = compactionCallableModel.carbonTable
+    val kettleHomePath = compactionCallableModel.kettleHomePath
+    val cubeCreationTime = compactionCallableModel.cubeCreationTime
+    val loadsToMerge = compactionCallableModel.loadsToMerge
+    val sc = compactionCallableModel.sqlContext
+    val carbonLoadModel = compactionCallableModel.carbonLoadModel
+    val compactionType = compactionCallableModel.compactionType
 
     val startTime = System.nanoTime();
     val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
@@ -117,7 +117,7 @@ object Compactor {
       logger.info("time taken to merge " + mergedLoadName + " is " + (endTime - startTime))
       CarbonDataMergerUtil
         .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
-          mergedLoadName, carbonLoadModel, mergeLoadStartTime
+          mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
         )
       logger
         .audit("Compaction request completed for table " + carbonLoadModel

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
new file mode 100644
index 0000000..1005e83
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -0,0 +1,130 @@
+package org.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.lcm.status.SegmentStatusManager
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+  * FT for compaction scenario where major segment should not be included in minor.
+  */
+class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    sql("drop table if exists  ignoremajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS ignoremajor (country String, ID Int, date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+
+    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+      .getCanonicalPath
+    val csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+    val csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+    val csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table ignoremajor compact 'major'"
+    )
+    if (checkCompactionCompletedOrNot("0.1")) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor  OPTIONS" +
+        "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+      sql("alter table ignoremajor compact 'minor'"
+      )
+      if (checkCompactionCompletedOrNot("2.1")) {
+        sql("alter table ignoremajor compact 'minor'"
+        )
+      }
+
+    }
+
+  }
+
+  /**
+    * Check if the compaction is completed or not.
+    *
+    * @param requiredSeg
+    * @return
+    */
+  def checkCompactionCompletedOrNot(requiredSeg: String): Boolean = {
+    var status = false
+    var noOfRetries = 0
+    while (!status && noOfRetries < 10) {
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+          AbsoluteTableIdentifier(
+            CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+            new CarbonTableIdentifier("default", "ignoremajor", noOfRetries + "")
+          )
+      )
+      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      segments.foreach(seg =>
+        System.out.println( "valid segment is =" + seg)
+      )
+
+      if (!segments.contains(requiredSeg)) {
+        // wait for 2 seconds for compaction to complete.
+        System.out.println("sleping for 2 seconds.")
+        Thread.sleep(2000)
+        noOfRetries += 1
+      }
+      else {
+        status = true
+      }
+    }
+    return status
+  }
+
+  /**
+    * Test whether major compaction is not included in minor compaction.
+    */
+  test("delete merged folder and check segments") {
+    // delete merged segments
+    sql("clean files for table ignoremajor")
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier("default", "ignoremajor", "rrr")
+        )
+    )
+    // merged segment should not be there
+    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    assert(segments.contains("0.1"))
+    assert(segments.contains("2.1"))
+    assert(!segments.contains("2"))
+    assert(!segments.contains("3"))
+
+  }
+
+  override def afterAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index e6220d8..b35bc06 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -107,7 +107,6 @@ public class SegmentStatusManager {
         .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
     String dataPath = carbonTablePath.getTableStatusFilePath();
-
     DataInputStream dataInputStream = null;
     Gson gsonObjectToRead = new Gson();
     AtomicFileOperations fileOperation =
@@ -134,7 +133,6 @@ public class SegmentStatusManager {
               .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
             // check for merged loads.
             if (null != loadMetadataDetails.getMergedLoadName()) {
-
               if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
                 listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
               }