You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:28 UTC
[30/47] incubator-carbondata git commit: [CARBONDATA-116]ignore the
major compacted segment for minor compaction (#880)
[CARBONDATA-116]ignore the major compacted segment for minor compaction (#880)
Once a segment is formed using major compaction the same segment can not be considered for minor compaction. this PR will skip the same for an effective compaction.
* for correcting test case.
* for test case correction.
* correcting test cases.
* correcting test cases.
* correcting test cases.
* fixing review comments.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/bdc1321d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/bdc1321d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/bdc1321d
Branch: refs/heads/master
Commit: bdc1321d9c3f42c6bd56f6bf787cfba5bf3fb6b9
Parents: 4d2f684
Author: ravikiran23 <ra...@gmail.com>
Authored: Thu Jul 28 19:39:01 2016 +0530
Committer: sujith71955 <su...@gmail.com>
Committed: Thu Jul 28 19:39:01 2016 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 5 +
.../core/load/LoadMetadataDetails.java | 20 +++
.../spark/merger/CompactionCallable.java | 36 +----
.../spark/merger/CarbonDataMergerUtil.java | 11 +-
.../execution/command/carbonTableSchema.scala | 6 +
.../spark/rdd/CarbonDataRDDFactory.scala | 15 ++-
.../org/carbondata/spark/rdd/Compactor.scala | 30 ++---
.../MajorCompactionIgnoreInMinorTest.scala | 130 +++++++++++++++++++
.../lcm/status/SegmentStatusManager.java | 2 -
9 files changed, 201 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 739462e..5852241 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -844,6 +844,11 @@ public final class CarbonCommonConstants {
*/
public static final int COMPACTION_INMEMORY_RECORD_SIZE = 120000;
+ /**
+ * If the level 2 compaction is done in minor then new compacted segment will end with .2
+ */
+ public static String LEVEL2_COMPACTION_INDEX = ".2";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
index 0250b2e..f0b5ac9 100644
--- a/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
+++ b/core/src/main/java/org/carbondata/core/load/LoadMetadataDetails.java
@@ -56,6 +56,11 @@ public class LoadMetadataDetails implements Serializable {
*/
private String visibility = "true";
+ /**
+ * To know if the segment is a major compacted segment or not.
+ */
+ private String majorCompacted;
+
public String getPartitionCount() {
return partitionCount;
}
@@ -203,4 +208,19 @@ public class LoadMetadataDetails implements Serializable {
this.visibility = visibility;
}
+ /**
+ * Return true if it is a major compacted segment.
+ * @return
+ */
+ public String isMajorCompacted() {
+ return majorCompacted;
+ }
+
+ /**
+ * Set true if it is a major compacted segment.
+ * @param majorCompacted
+ */
+ public void setMajorCompacted(String majorCompacted) {
+ this.majorCompacted = majorCompacted;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
index 7289e61..b90d46e 100644
--- a/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
+++ b/integration/spark/src/main/java/org/carbondata/integration/spark/merger/CompactionCallable.java
@@ -17,51 +17,27 @@
package org.carbondata.integration.spark.merger;
-import java.util.List;
import java.util.concurrent.Callable;
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable;
-import org.carbondata.core.load.LoadMetadataDetails;
-import org.carbondata.spark.load.CarbonLoadModel;
import org.carbondata.spark.rdd.Compactor;
-import org.apache.spark.sql.SQLContext;
-import org.apache.spark.sql.execution.command.Partitioner;
+import org.apache.spark.sql.execution.command.CompactionCallableModel;
/**
- *
+ * Callable class which is used to trigger the compaction in a separate callable.
*/
public class CompactionCallable implements Callable<Void> {
- private final String hdfsStoreLocation;
- private final Partitioner partitioner;
- private final String storeLocation;
- private final CarbonTable carbonTable;
- private final String kettleHomePath;
- private final Long cubeCreationTime;
- private final List<LoadMetadataDetails> loadsToMerge;
- private final SQLContext sqlContext;
- private final CarbonLoadModel carbonLoadModel;
+ private final CompactionCallableModel compactionCallableModel;
- public CompactionCallable(String hdfsStoreLocation, CarbonLoadModel carbonLoadModel,
- Partitioner partitioner, String storeLocation, CarbonTable carbonTable, String kettleHomePath,
- Long cubeCreationTime, List<LoadMetadataDetails> loadsToMerge, SQLContext sqlContext) {
+ public CompactionCallable(CompactionCallableModel compactionCallableModel) {
- this.hdfsStoreLocation = hdfsStoreLocation;
- this.carbonLoadModel = carbonLoadModel;
- this.partitioner = partitioner;
- this.storeLocation = storeLocation;
- this.carbonTable = carbonTable;
- this.kettleHomePath = kettleHomePath;
- this.cubeCreationTime = cubeCreationTime;
- this.loadsToMerge = loadsToMerge;
- this.sqlContext = sqlContext;
+ this.compactionCallableModel = compactionCallableModel;
}
@Override public Void call() throws Exception {
- Compactor.triggerCompaction(hdfsStoreLocation, carbonLoadModel, partitioner, storeLocation,
- carbonTable, kettleHomePath, cubeCreationTime, loadsToMerge, sqlContext);
+ Compactor.triggerCompaction(compactionCallableModel);
return null;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index 3aa66c2..f033993 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -128,7 +128,7 @@ public final class CarbonDataMergerUtil {
public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
- String mergeLoadStartTime) {
+ String mergeLoadStartTime, CompactionType compactionType) {
boolean tableStatusUpdationStatus = false;
AbsoluteTableIdentifier absoluteTableIdentifier =
@@ -176,6 +176,10 @@ public final class CarbonDataMergerUtil {
loadMetadataDetails.setLoadName(mergedLoadNumber);
loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
loadMetadataDetails.setPartitionCount("0");
+ // if this is a major compaction then set the segment as major compaction.
+ if (compactionType == CompactionType.MAJOR_COMPACTION) {
+ loadMetadataDetails.setMajorCompacted("true");
+ }
List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
@@ -514,7 +518,10 @@ public final class CarbonDataMergerUtil {
// if a segment is already merged 2 levels then it s name will become .2
// need to exclude those segments from minor compaction.
- if (segName.endsWith(".2")) {
+ // if a segment is major compacted then should not be considered for minor.
+ if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
+ segment.isMajorCompacted() != null && segment.isMajorCompacted()
+ .equalsIgnoreCase("true"))) {
continue;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 897759f..1dd066f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -48,6 +48,7 @@ import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
import org.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema
import org.carbondata.core.constants.CarbonCommonConstants
import org.carbondata.core.datastorage.store.impl.FileFactory
+import org.carbondata.core.load.LoadMetadataDetails
import org.carbondata.core.locks.{CarbonLockFactory, LockUsage}
import org.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.carbondata.integration.spark.merger.CompactionType
@@ -158,6 +159,11 @@ case class CompactionModel(compactionSize: Long,
carbonTable: CarbonTable,
cubeCreationTime: Long)
+case class CompactionCallableModel(hdfsStoreLocation: String, carbonLoadModel: CarbonLoadModel,
+ partitioner: Partitioner, storeLocation: String, carbonTable: CarbonTable, kettleHomePath: String,
+ cubeCreationTime: Long, loadsToMerge: util.List[LoadMetadataDetails], sqlContext: SQLContext,
+ compactionType: CompactionType)
+
object TableNewProcessor {
def apply(cm: tableModel, sqlContext: SQLContext): TableInfo = {
new TableNewProcessor(cm, sqlContext).process
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7726739..15de7bf 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -31,7 +31,8 @@ import org.apache.hadoop.mapreduce.Job
import org.apache.hadoop.mapreduce.lib.input.FileSplit
import org.apache.spark.{Logging, Partition, SparkContext, SparkEnv}
import org.apache.spark.sql.{CarbonEnv, CarbonRelation, SQLContext}
-import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, Partitioner}
+import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionCallableModel,
+CompactionModel, Partitioner}
import org.apache.spark.sql.hive.DistributionUtil
import org.apache.spark.util.{FileUtils, SplitUtils}
@@ -476,7 +477,7 @@ object CarbonDataRDDFactory extends Logging {
}
)
- val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
+ val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
carbonLoadModel,
partitioner,
storeLocation,
@@ -484,9 +485,13 @@ object CarbonDataRDDFactory extends Logging {
kettleHomePath,
compactionModel.cubeCreationTime,
loadsToMerge,
- sqlContext
- )
- )
+ sqlContext,
+ compactionModel.compactionType)
+
+ val future: Future[Void] = executor
+ .submit(new CompactionCallable(compactionCallableModel
+ )
+ )
futureList.add(future)
segList = CarbonDataMergerUtil
.filterOutAlreadyMergedSegments(segList, loadsToMerge)
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
index 2680d7d..7e19ded 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/Compactor.scala
@@ -19,17 +19,14 @@ package org.carbondata.spark.rdd
import scala.collection.JavaConverters._
-import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.execution.command.{CarbonMergerMapping, Partitioner}
+import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
import org.carbondata.common.logging.LogServiceFactory
import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.carbondata.core.carbon.metadata.schema.table.CarbonTable
import org.carbondata.core.constants.CarbonCommonConstants
-import org.carbondata.core.load.LoadMetadataDetails
import org.carbondata.core.util.CarbonProperties
import org.carbondata.lcm.status.SegmentStatusManager
-import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
+import org.carbondata.spark.load.CarbonLoaderUtil
import org.carbondata.spark.MergeResultImpl
import org.carbondata.spark.merger.CarbonDataMergerUtil
@@ -40,15 +37,18 @@ object Compactor {
val logger = LogServiceFactory.getLogService(Compactor.getClass.getName)
- def triggerCompaction(hdfsStoreLocation: String,
- carbonLoadModel: CarbonLoadModel,
- partitioner: Partitioner,
- storeLocation: String,
- carbonTable: CarbonTable,
- kettleHomePath: String,
- cubeCreationTime: Long,
- loadsToMerge: java.util.List[LoadMetadataDetails],
- sc: SQLContext): Unit = {
+ def triggerCompaction(compactionCallableModel: CompactionCallableModel): Unit = {
+
+ val hdfsStoreLocation = compactionCallableModel.hdfsStoreLocation
+ val partitioner = compactionCallableModel.partitioner
+ val storeLocation = compactionCallableModel.storeLocation
+ val carbonTable = compactionCallableModel.carbonTable
+ val kettleHomePath = compactionCallableModel.kettleHomePath
+ val cubeCreationTime = compactionCallableModel.cubeCreationTime
+ val loadsToMerge = compactionCallableModel.loadsToMerge
+ val sc = compactionCallableModel.sqlContext
+ val carbonLoadModel = compactionCallableModel.carbonLoadModel
+ val compactionType = compactionCallableModel.compactionType
val startTime = System.nanoTime();
val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
@@ -117,7 +117,7 @@ object Compactor {
logger.info("time taken to merge " + mergedLoadName + " is " + (endTime - startTime))
CarbonDataMergerUtil
.updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
- mergedLoadName, carbonLoadModel, mergeLoadStartTime
+ mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType
)
logger
.audit("Compaction request completed for table " + carbonLoadModel
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
new file mode 100644
index 0000000..1005e83
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -0,0 +1,130 @@
+package org.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.lcm.status.SegmentStatusManager
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+ * FT for compaction scenario where major segment should not be included in minor.
+ */
+class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+ sql("drop table if exists ignoremajor")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+ sql(
+ "CREATE TABLE IF NOT EXISTS ignoremajor (country String, ID Int, date Timestamp, name " +
+ "String, " +
+ "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+ ".format'"
+ )
+
+
+ val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+ .getCanonicalPath
+ val csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+ val csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+ val csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ // compaction will happen here.
+ sql("alter table ignoremajor compact 'major'"
+ )
+ if (checkCompactionCompletedOrNot("0.1")) {
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE ignoremajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE ignoremajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("alter table ignoremajor compact 'minor'"
+ )
+ if (checkCompactionCompletedOrNot("2.1")) {
+ sql("alter table ignoremajor compact 'minor'"
+ )
+ }
+
+ }
+
+ }
+
+ /**
+ * Check if the compaction is completed or not.
+ *
+ * @param requiredSeg
+ * @return
+ */
+ def checkCompactionCompletedOrNot(requiredSeg: String): Boolean = {
+ var status = false
+ var noOfRetries = 0
+ while (!status && noOfRetries < 10) {
+
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+ AbsoluteTableIdentifier(
+ CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+ new CarbonTableIdentifier("default", "ignoremajor", noOfRetries + "")
+ )
+ )
+ val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ segments.foreach(seg =>
+ System.out.println( "valid segment is =" + seg)
+ )
+
+ if (!segments.contains(requiredSeg)) {
+ // wait for 2 seconds for compaction to complete.
+ System.out.println("sleping for 2 seconds.")
+ Thread.sleep(2000)
+ noOfRetries += 1
+ }
+ else {
+ status = true
+ }
+ }
+ return status
+ }
+
+ /**
+ * Test whether major compaction is not included in minor compaction.
+ */
+ test("delete merged folder and check segments") {
+ // delete merged segments
+ sql("clean files for table ignoremajor")
+
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+ AbsoluteTableIdentifier(
+ CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+ new CarbonTableIdentifier("default", "ignoremajor", "rrr")
+ )
+ )
+ // merged segment should not be there
+ val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ assert(segments.contains("0.1"))
+ assert(segments.contains("2.1"))
+ assert(!segments.contains("2"))
+ assert(!segments.contains("3"))
+
+ }
+
+ override def afterAll {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/bdc1321d/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
index e6220d8..b35bc06 100644
--- a/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
+++ b/processing/src/main/java/org/carbondata/lcm/status/SegmentStatusManager.java
@@ -107,7 +107,6 @@ public class SegmentStatusManager {
.getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
absoluteTableIdentifier.getCarbonTableIdentifier());
String dataPath = carbonTablePath.getTableStatusFilePath();
-
DataInputStream dataInputStream = null;
Gson gsonObjectToRead = new Gson();
AtomicFileOperations fileOperation =
@@ -134,7 +133,6 @@ public class SegmentStatusManager {
.equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
// check for merged loads.
if (null != loadMetadataDetails.getMergedLoadName()) {
-
if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
}