You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/14 17:50:27 UTC

[05/18] carbondata git commit: [CARBONDATA-1693][Streaming] Change SegmentStatus from String to enum

[CARBONDATA-1693][Streaming] Change SegmentStatus from String to enum

This closes #1480


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2f0959a8
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2f0959a8
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2f0959a8

Branch: refs/heads/fgdatamap
Commit: 2f0959a824b728e8f6193753767e1f58dee4017d
Parents: 1a62189
Author: Jacky Li <ja...@qq.com>
Authored: Sun Nov 12 22:50:15 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Tue Nov 14 19:02:05 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  52 ---------
 .../core/datastore/row/LoadStatusType.java      |  37 -------
 .../core/mutate/CarbonUpdateUtil.java           |  25 ++---
 .../core/mutate/SegmentUpdateDetails.java       |  11 +-
 .../reader/CarbonDeleteDeltaFileReaderImpl.java |   4 +-
 .../core/statusmanager/LoadMetadataDetails.java |  15 ++-
 .../core/statusmanager/SegmentStatus.java       | 108 +++++++++++++++++++
 .../statusmanager/SegmentStatusManager.java     |  86 +++++++--------
 .../SegmentUpdateStatusManager.java             |  25 +++--
 .../writer/CarbonDeleteDeltaWriterImpl.java     |   4 +-
 .../examples/SparkSessionExample.scala          |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   8 +-
 .../hadoop/util/BlockLevelTraverser.java        |   2 +-
 .../hadoop/test/util/StoreCreator.java          |   3 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   6 +-
 .../MajorCompactionIgnoreInMinorTest.scala      |  11 +-
 .../org/apache/carbondata/api/CarbonStore.scala |  20 ++--
 .../org/apache/carbondata/spark/KeyVal.scala    |  10 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   6 +-
 .../spark/rdd/AlterTableAddColumnRDD.scala      |  11 +-
 .../spark/rdd/AlterTableDropColumnRDD.scala     |  11 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  13 +--
 .../spark/rdd/DataManagementFunc.scala          |  18 +++-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  29 +++--
 .../carbondata/spark/rdd/SparkDataMapJob.scala  |   4 -
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |   4 +-
 .../carbondata/spark/util/CommonUtil.scala      |  18 ++--
 .../spark/util/GlobalDictionaryUtil.scala       |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  69 ++++++------
 .../command/management/LoadTableCommand.scala   |   7 +-
 .../command/mutation/DeleteExecution.scala      |  28 ++---
 .../org/apache/spark/util/ShowSegments.scala    |  88 ---------------
 .../segmentreading/TestSegmentReading.scala     |   4 +-
 .../TestStreamingTableOperation.scala           |   3 +-
 .../apache/spark/util/CarbonCommandSuite.scala  |   7 +-
 .../processing/merger/CarbonDataMergerUtil.java |  42 ++++----
 .../processing/util/CarbonLoaderUtil.java       |  21 ++--
 .../processing/util/DeleteLoadFolders.java      |   6 +-
 .../processing/util/LoadMetadataUtil.java       |  47 --------
 .../carbondata/processing/StoreCreator.java     |   4 +-
 .../streaming/segment/StreamSegment.java        |   9 +-
 41 files changed, 376 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 8018c2b..5c04e8d 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -512,15 +512,6 @@ public final class CarbonCommonConstants {
    */
   public static final String ENABLE_DATA_LOADING_STATISTICS_DEFAULT = "false";
   /**
-   * IS_INT_BASED_INDEXER
-   */
-  @CarbonProperty
-  public static final String HIGH_CARDINALITY_VALUE = "high.cardinality.value";
-  /**
-   * IS_INT_BASED_INDEXER_DEFAULTVALUE
-   */
-  public static final String HIGH_CARDINALITY_VALUE_DEFAULTVALUE = "100000";
-  /**
    * CONSTANT_SIZE_TEN
    */
   public static final int CONSTANT_SIZE_TEN = 10;
@@ -528,41 +519,8 @@ public final class CarbonCommonConstants {
    * LEVEL_METADATA_FILE
    */
   public static final String LEVEL_METADATA_FILE = "levelmetadata_";
-  /**
-   * LOAD_STATUS SUCCESS
-   */
-  public static final String STORE_LOADSTATUS_SUCCESS = "Success";
-  /**
-   * LOAD_STATUS UPDATE
-   */
-  public static final String STORE_LOADSTATUS_UPDATE = "Update";
-  /**
-   * LOAD_STATUS FAILURE
-   */
-  public static final String STORE_LOADSTATUS_FAILURE = "Failure";
-  /**
-   * LOAD_STATUS PARTIAL_SUCCESS
-   */
-  public static final String STORE_LOADSTATUS_PARTIAL_SUCCESS = "Partial Success";
 
   /**
-   * STORE_LOADSTATUS_STREAMING
-   */
-  public static final String STORE_LOADSTATUS_STREAMING = "Streaming";
-
-  /**
-   * STORE_LOADSTATUS_STREAMING
-   */
-  public static final String STORE_LOADSTATUS_STREAMING_FINISH = "Streaming Finish";
-  /**
-   * LOAD_STATUS
-   */
-  public static final String CARBON_METADATA_EXTENSION = ".metadata";
-  /**
-   * LOAD_STATUS
-   */
-  public static final String CARBON_DEFAULT_STREAM_ENCODEFORMAT = "UTF-8";
-  /**
    * COMMA
    */
   public static final String COMMA = ",";
@@ -632,11 +590,6 @@ public final class CarbonCommonConstants {
    */
   public static final String DEFAULT_COMPRESSOR = "snappy";
 
-  /**
-   * MARKED_FOR_DELETION
-   */
-  public static final String MARKED_FOR_DELETE = "Marked for Delete";
-  public static final String MARKED_FOR_UPDATE = "Marked for Update";
   public static final String STRING_TYPE = "StringType";
   public static final String INTEGER_TYPE = "IntegerType";
   public static final String LONG_TYPE = "LongType";
@@ -915,11 +868,6 @@ public final class CarbonCommonConstants {
   public static final long CARBON_256MB = 256 * 1024 * 1024;
 
   /**
-   * COMPACTED is property to indicate whether seg is compacted or not.
-   */
-  public static final String COMPACTED = "Compacted";
-
-  /**
    * ZOOKEEPERLOCK TYPE
    */
   public static final String CARBON_LOCK_TYPE_ZOOKEEPER = "ZOOKEEPERLOCK";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
deleted file mode 100644
index d3c5a94..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.row;
-
-/**
- * Load status type
- */
-public enum LoadStatusType {
-
-  INSERT_OVERWRITE("Overwrite In Progress"), // if insert overwrite operation is in progress
-  IN_PROGRESS("In Progress"); // if load, insert into operation is in progress
-
-  private String message;
-
-  LoadStatusType(String message) {
-    this.message = message;
-  }
-
-  public String getMessage() {
-    return message;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 57e1544..6589ee5 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -145,7 +146,7 @@ public class CarbonUpdateUtil {
                   .setDeleteDeltaStartTimestamp(newBlockEntry.getDeleteDeltaStartTimestamp());
             }
             blockDetail.setDeleteDeltaEndTimestamp(newBlockEntry.getDeleteDeltaEndTimestamp());
-            blockDetail.setStatus(newBlockEntry.getStatus());
+            blockDetail.setSegmentStatus(newBlockEntry.getSegmentStatus());
             blockDetail.setDeletedRowsInBlock(newBlockEntry.getDeletedRowsInBlock());
           } else {
             // add the new details to the list.
@@ -224,7 +225,7 @@ public class CarbonUpdateUtil {
 
             // if the segments is in the list of marked for delete then update the status.
             if (segmentsToBeDeleted.contains(loadMetadata.getLoadName())) {
-              loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+              loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
               loadMetadata.setModificationOrdeletionTimesStamp(Long.parseLong(updatedTimeStamp));
             }
           }
@@ -508,9 +509,8 @@ public class CarbonUpdateUtil {
       // if this segment is valid then only we will go for delta file deletion.
       // if the segment is mark for delete or compacted then any way it will get deleted.
 
-      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-              || segment.getLoadStatus()
-              .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) {
+      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
+              || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
 
         // take the list of files from this segment.
         String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
@@ -565,7 +565,7 @@ public class CarbonUpdateUtil {
           }
 
           // case 1
-          if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+          if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
             completeListOfDeleteDeltaFiles = updateStatusManager
                     .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
                             allSegmentFiles);
@@ -687,17 +687,8 @@ public class CarbonUpdateUtil {
     }
   }
 
-  /**
-   *
-   * @param blockStatus
-   * @return
-   */
-  public static boolean isBlockInvalid(String blockStatus) {
-    if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus
-            .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-      return true;
-    }
-    return false;
+  public static boolean isBlockInvalid(SegmentStatus blockStatus) {
+    return blockStatus == SegmentStatus.COMPACTED || blockStatus == SegmentStatus.MARKED_FOR_DELETE;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
index 6f99b3a..583e2ec 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/SegmentUpdateDetails.java
@@ -21,6 +21,7 @@ import java.io.Serializable;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 
 /**
  * This class stores the segment details of table update status file
@@ -30,7 +31,7 @@ public class SegmentUpdateDetails implements Serializable {
   private static final long serialVersionUID = 1206104914918491724L;
   private String segmentName;
   private String blockName;
-  private String status = "";
+  private SegmentStatus segmentStatus;
   private String deleteDeltaEndTimestamp = "";
   private String deleteDeltaStartTimestamp = "";
   private String actualBlockName;
@@ -74,12 +75,12 @@ public class SegmentUpdateDetails implements Serializable {
     this.deleteDeltaStartTimestamp = deleteDeltaStartTimestamp;
   }
 
-  public void setStatus(String status) {
-    this.status = status;
+  public void setSegmentStatus(SegmentStatus segmentStatus) {
+    this.segmentStatus = segmentStatus;
   }
 
-  public String getStatus() {
-    return this.status;
+  public SegmentStatus getSegmentStatus() {
+    return this.segmentStatus;
   }
 
   @Override public int hashCode() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
index b75d796..1c0b586 100644
--- a/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/reader/CarbonDeleteDeltaFileReaderImpl.java
@@ -80,7 +80,7 @@ public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileRea
     StringWriter sw = new StringWriter();
     dataInputStream = FileFactory.getDataInputStream(filePath, fileType);
     inputStream = new InputStreamReader(dataInputStream,
-        CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+        CarbonCommonConstants.DEFAULT_CHARSET);
     int n = 0;
     while (-1 != (n = inputStream.read(buffer))) {
       sw.write(buffer, 0, n);
@@ -108,7 +108,7 @@ public class CarbonDeleteDeltaFileReaderImpl implements CarbonDeleteDeltaFileRea
       }
       dataInputStream = fileOperation.openForRead();
       inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+          CarbonCommonConstants.DEFAULT_CHARSET);
       buffReader = new BufferedReader(inStream);
       deleteDeltaBlockDetails =
           gsonObjectToRead.fromJson(buffReader, DeleteDeltaBlockDetails.class);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 51a04e4..f42ca23 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -30,9 +30,16 @@ public class LoadMetadataDetails implements Serializable {
 
   private static final long serialVersionUID = 1106104914918491724L;
   private String timestamp;
-  private String loadStatus;
+
+  // For backward compatibility, this member is required to read from JSON in the table_status file
+  private SegmentStatus loadStatus;
+
+  // name of the segment
   private String loadName;
+
+  // partition count of this segment
   private String partitionCount;
+
   private String isDeleted = CarbonCommonConstants.KEYWORD_FALSE;
 
   // update delta end timestamp
@@ -94,12 +101,12 @@ public class LoadMetadataDetails implements Serializable {
     this.timestamp = getTimeStampConvertion(timestamp);;
   }
 
-  public String getLoadStatus() {
+  public SegmentStatus getSegmentStatus() {
     return loadStatus;
   }
 
-  public void setLoadStatus(String loadStatus) {
-    this.loadStatus = loadStatus;
+  public void setSegmentStatus(SegmentStatus segmentStatus) {
+    this.loadStatus = segmentStatus;
   }
 
   public String getLoadName() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
new file mode 100644
index 0000000..3b62ec9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatus.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import com.google.gson.annotations.SerializedName;
+
+/**
+ * Status of one segment. This enum is serialized into table_status file, so
+ * please ensure the SerializedName is backward compatible when modifying this enum
+ */
+public enum SegmentStatus {
+
+  /**
+   * Data load success, it is visible for read
+   */
+  @SerializedName("Success")
+  SUCCESS("Success"),
+
+  /**
+   * Data load failed
+   */
+  @SerializedName("Failure")
+  LOAD_FAILURE("Failure"),
+
+  /**
+   * Data load partial success
+   */
+  @SerializedName("Partial Success")
+  LOAD_PARTIAL_SUCCESS("Partial Success"),
+
+  /**
+   * Segment has been deleted by user or compactor
+   */
+  @SerializedName("Marked for Delete")
+  MARKED_FOR_DELETE("Marked for Delete"),
+
+  /**
+   * Segment has been updated by user
+   */
+  @SerializedName("Marked for Update")
+  MARKED_FOR_UPDATE("Marked for Update"),
+
+  /**
+   * Segment is compacted
+   */
+  @SerializedName("Compacted")
+  COMPACTED("Compacted"),
+
+  /**
+   * Insert overwrite operation is in progress
+   */
+  @SerializedName("Overwrite In Progress")  // This string can't be modified due to compatibility
+  INSERT_OVERWRITE_IN_PROGRESS("Insert Overwrite In Progress"),
+
+  /**
+   * insert into operation is in progress
+   */
+  @SerializedName("In Progress")  // This string can't be modified due to compatibility
+  INSERT_IN_PROGRESS("Insert In Progress"),
+
+  /**
+   * Streaming ingest in progress, for streaming segment
+   */
+  @SerializedName("Streaming")
+  STREAMING("Streaming"),
+
+  /**
+   * Streaming ingest finish, for streaming segment
+   */
+  @SerializedName("Streaming Finish")
+  STREAMING_FINISH("Streaming Finish"),
+
+  /**
+   * This status is not used, keep it here just for backward compatibility
+   */
+  @SerializedName("Update")
+  UPDATE("Update");
+
+  private String message;
+
+  SegmentStatus(String message) {
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  @Override
+  public String toString() {
+    return message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 30304d9..a42239f 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.row.LoadStatusType;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
@@ -109,7 +108,10 @@ public class SegmentStatusManager {
                     absoluteTableIdentifier.getCarbonTableIdentifier());
     String dataPath = carbonTablePath.getTableStatusFilePath();
     DataInputStream dataInputStream = null;
-    Gson gsonObjectToRead = new Gson();
+
+    // Use GSON to deserialize the load information
+    Gson gson = new Gson();
+
     AtomicFileOperations fileOperation =
             new AtomicFileOperationsImpl(dataPath, FileFactory.getFileType(dataPath));
     LoadMetadataDetails[] loadFolderDetailsArray;
@@ -117,53 +119,41 @@ public class SegmentStatusManager {
       if (FileFactory.isFileExist(dataPath, FileFactory.getFileType(dataPath))) {
         dataInputStream = fileOperation.openForRead();
         BufferedReader buffReader =
-                new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
-        loadFolderDetailsArray = gsonObjectToRead.fromJson(buffReader, LoadMetadataDetails[].class);
+            new BufferedReader(new InputStreamReader(dataInputStream, "UTF-8"));
+        loadFolderDetailsArray = gson.fromJson(buffReader, LoadMetadataDetails[].class);
         //just directly iterate Array
-        for (LoadMetadataDetails loadMetadataDetails : loadFolderDetailsArray) {
-          if (CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.MARKED_FOR_UPDATE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+        for (LoadMetadataDetails segment : loadFolderDetailsArray) {
+          if (SegmentStatus.SUCCESS == segment.getSegmentStatus() ||
+              SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus() ||
+              SegmentStatus.LOAD_PARTIAL_SUCCESS == segment.getSegmentStatus() ||
+              SegmentStatus.STREAMING == segment.getSegmentStatus() ||
+              SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
             // check for merged loads.
-            if (null != loadMetadataDetails.getMergedLoadName()) {
-              if (!listOfValidSegments.contains(loadMetadataDetails.getMergedLoadName())) {
-                listOfValidSegments.add(loadMetadataDetails.getMergedLoadName());
+            if (null != segment.getMergedLoadName()) {
+              if (!listOfValidSegments.contains(segment.getMergedLoadName())) {
+                listOfValidSegments.add(segment.getMergedLoadName());
               }
               // if merged load is updated then put it in updated list
-              if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                      .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-                listOfValidUpdatedSegments.add(loadMetadataDetails.getMergedLoadName());
+              if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
+                listOfValidUpdatedSegments.add(segment.getMergedLoadName());
               }
               continue;
             }
 
-            if (CarbonCommonConstants.MARKED_FOR_UPDATE
-                    .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
+            if (SegmentStatus.MARKED_FOR_UPDATE == segment.getSegmentStatus()) {
 
-              listOfValidUpdatedSegments.add(loadMetadataDetails.getLoadName());
+              listOfValidUpdatedSegments.add(segment.getLoadName());
             }
-            if (CarbonCommonConstants.STORE_LOADSTATUS_STREAMING
-                .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                || CarbonCommonConstants.STORE_LOADSTATUS_STREAMING_FINISH
-                .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())) {
-              listOfStreamSegments.add(loadMetadataDetails.getLoadName());
+            if (SegmentStatus.STREAMING == segment.getSegmentStatus() ||
+                SegmentStatus.STREAMING_FINISH == segment.getSegmentStatus()) {
+              listOfStreamSegments.add(segment.getLoadName());
               continue;
             }
-            listOfValidSegments.add(loadMetadataDetails.getLoadName());
-          } else if ((CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.COMPACTED
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus())
-                  || CarbonCommonConstants.MARKED_FOR_DELETE
-                  .equalsIgnoreCase(loadMetadataDetails.getLoadStatus()))) {
-            listOfInvalidSegments.add(loadMetadataDetails.getLoadName());
+            listOfValidSegments.add(segment.getLoadName());
+          } else if ((SegmentStatus.LOAD_FAILURE == segment.getSegmentStatus() ||
+              SegmentStatus.COMPACTED == segment.getSegmentStatus() ||
+              SegmentStatus.MARKED_FOR_DELETE == segment.getSegmentStatus())) {
+            listOfInvalidSegments.add(segment.getLoadName());
           }
         }
       }
@@ -489,15 +479,14 @@ public class SegmentStatusManager {
 
         if (loadId.equalsIgnoreCase(loadMetadata.getLoadName())) {
           // if the segment is compacted then no need to delete that.
-          if (CarbonCommonConstants.COMPACTED
-                  .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+          if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
             LOG.error("Cannot delete the Segment which is compacted. Segment is " + loadId);
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
           }
-          if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())) {
+          if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
             loadFound = true;
-            loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+            loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
             loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
             LOG.info("Segment ID " + loadId + " Marked for Delete");
           }
@@ -533,15 +522,14 @@ public class SegmentStatusManager {
     for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
       Integer result = compareDateValues(loadMetadata.getLoadStartTimeAsLong(), loadStartTime);
       if (result < 0) {
-        if (CarbonCommonConstants.COMPACTED
-            .equalsIgnoreCase(loadMetadata.getLoadStatus())) {
+        if (SegmentStatus.COMPACTED == loadMetadata.getSegmentStatus()) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment has been compacted.");
           continue;
         }
-        if (!CarbonCommonConstants.MARKED_FOR_DELETE.equals(loadMetadata.getLoadStatus())
-            && !LoadStatusType.IN_PROGRESS.getMessage().equals(loadMetadata.getLoadStatus())
-            && !LoadStatusType.INSERT_OVERWRITE.getMessage().equals(loadMetadata.getLoadStatus())) {
+        if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus() &&
+            SegmentStatus.INSERT_IN_PROGRESS != loadMetadata.getSegmentStatus() &&
+            SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS != loadMetadata.getSegmentStatus()) {
           loadFound = true;
           updateSegmentMetadataDetails(loadMetadata);
           LOG.info("Info: " +
@@ -594,7 +582,7 @@ public class SegmentStatusManager {
     List<LoadMetadataDetails> newListMetadata =
         new ArrayList<LoadMetadataDetails>(Arrays.asList(newMetadata));
     for (LoadMetadataDetails oldSegment : oldMetadata) {
-      if (CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oldSegment.getLoadStatus())) {
+      if (SegmentStatus.MARKED_FOR_DELETE == oldSegment.getSegmentStatus()) {
         updateSegmentMetadataDetails(newListMetadata.get(newListMetadata.indexOf(oldSegment)));
       }
     }
@@ -608,8 +596,8 @@ public class SegmentStatusManager {
    */
   public static void updateSegmentMetadataDetails(LoadMetadataDetails loadMetadata) {
     // update status only if the segment is not marked for delete
-    if (!CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(loadMetadata.getLoadStatus())) {
-      loadMetadata.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE);
+    if (SegmentStatus.MARKED_FOR_DELETE != loadMetadata.getSegmentStatus()) {
+      loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
       loadMetadata.setModificationOrdeletionTimesStamp(CarbonUpdateUtil.readCurrentTime());
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 1218231..982c598 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -325,7 +325,7 @@ public class SegmentUpdateStatusManager {
     List<String> blockNames = new ArrayList<String>();
     for (SegmentUpdateDetails block : updateDetails) {
       if (block.getSegmentName().equalsIgnoreCase(segmentName) && !CarbonUpdateUtil
-          .isBlockInvalid(block.getStatus())) {
+          .isBlockInvalid(block.getSegmentStatus())) {
         blockNames.add(block.getBlockName());
       }
     }
@@ -343,11 +343,8 @@ public class SegmentUpdateStatusManager {
 
     SegmentUpdateDetails details = getDetailsForABlock(segName, blockName);
 
-    if (details == null || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
-      return true;
-    }
+    return details == null || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus());
 
-    return false;
   }
   /**
    * Returns all delta file paths of specified block
@@ -363,8 +360,9 @@ public class SegmentUpdateStatusManager {
       String segment) {
     List<String> deleteFileList = new ArrayList<>();
     for (SegmentUpdateDetails block : updateDetails) {
-      if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) && block.getSegmentName()
-          .equalsIgnoreCase(segment) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+      if (block.getBlockName().equalsIgnoreCase(blockNameFromTuple) &&
+          block.getSegmentName().equalsIgnoreCase(segment) &&
+          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
         final long deltaStartTimestamp = getStartTimeOfDeltaFile(extension, block);
         // If there is no delete delete file , then return null
         if (deltaStartTimestamp == 0) {
@@ -434,7 +432,7 @@ public class SegmentUpdateStatusManager {
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
           (block.getSegmentName().equalsIgnoreCase(segmentId))
-          && !CarbonUpdateUtil.isBlockInvalid((block.getStatus()))) {
+          && !CarbonUpdateUtil.isBlockInvalid((block.getSegmentStatus()))) {
         final long deltaStartTimestamp =
             getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
         final long deltaEndTimeStamp =
@@ -605,7 +603,7 @@ public class SegmentUpdateStatusManager {
 
             for (SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
               if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
-                  && CarbonUpdateUtil.isBlockInvalid(blockDetails.getStatus())) {
+                  && CarbonUpdateUtil.isBlockInvalid(blockDetails.getSegmentStatus())) {
                 validBlock = false;
               }
             }
@@ -699,7 +697,7 @@ public class SegmentUpdateStatusManager {
       }
       dataInputStream = fileOperation.openForRead();
       inStream = new InputStreamReader(dataInputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT);
+          CarbonCommonConstants.DEFAULT_CHARSET);
       buffReader = new BufferedReader(inStream);
       listOfSegmentUpdateDetailsArray =
           gsonObjectToRead.fromJson(buffReader, SegmentUpdateDetails[].class);
@@ -755,7 +753,7 @@ public class SegmentUpdateStatusManager {
     try {
       dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+          CarbonCommonConstants.DEFAULT_CHARSET));
 
       String metadataInstance = gsonObjectToWrite.toJson(listOfSegmentUpdateDetailsArray);
       brWriter.write(metadataInstance);
@@ -792,8 +790,9 @@ public class SegmentUpdateStatusManager {
     SegmentUpdateDetails[] listOfSegmentUpdateDetailsArray =
         readLoadMetadata();
     for (SegmentUpdateDetails block : listOfSegmentUpdateDetailsArray) {
-      if (segmentId.equalsIgnoreCase(block.getSegmentName()) && block.getBlockName()
-          .equalsIgnoreCase(blockName) && !CarbonUpdateUtil.isBlockInvalid(block.getStatus())) {
+      if (segmentId.equalsIgnoreCase(block.getSegmentName()) &&
+          block.getBlockName().equalsIgnoreCase(blockName) &&
+          !CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
         long deleteTimestampFromStatusFile = block.getDeleteDeltaEndTimeAsLong();
         if (Long.compare(deleteTimestampFromStatusFile, cacheTimestamp) == 0) {
           return null;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
index 3bf61f1..1802515 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonDeleteDeltaWriterImpl.java
@@ -70,7 +70,7 @@ public class CarbonDeleteDeltaWriterImpl implements CarbonDeleteDeltaWriter {
       FileFactory.createNewFile(filePath, fileType);
       dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+          CarbonCommonConstants.DEFAULT_CHARSET));
       brWriter.write(value);
     } catch (IOException ioe) {
       LOGGER.error("Error message: " + ioe.getLocalizedMessage());
@@ -98,7 +98,7 @@ public class CarbonDeleteDeltaWriterImpl implements CarbonDeleteDeltaWriter {
       dataOutStream = FileFactory.getDataOutputStream(filePath, fileType);
       Gson gsonObjectToWrite = new Gson();
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutStream,
-          CarbonCommonConstants.CARBON_DEFAULT_STREAM_ENCODEFORMAT));
+          CarbonCommonConstants.DEFAULT_CHARSET));
       String deletedData = gsonObjectToWrite.toJson(deleteDeltaBlockDetails);
       brWriter.write(deletedData);
     } catch (IOException ioe) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
index b823be5..df1e68f 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/SparkSessionExample.scala
@@ -15,13 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql.examples
+package org.apache.carbondata.examples
 
 import java.io.File
 
 import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.util.{CleanFiles, ShowSegments}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 92ef6da..f3963ad 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -321,12 +321,10 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
     List<String> streamSegments = null;
     List<String> filteredSegmentToAccess = new ArrayList<>();
     if (getValidateSegmentsToAccess(job.getConfiguration())) {
-
       String[] segmentsToAccess = getSegmentsToAccess(job);
       Set<String> segmentToAccessSet = new HashSet<>();
-      for (String segmentToAccess : segmentsToAccess) {
-        segmentToAccessSet.add(segmentToAccess);
-      }
+      segmentToAccessSet.addAll(Arrays.asList(segmentsToAccess));
+
       // get all valid segments and set them into the configuration
       SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
       SegmentStatusManager.ValidAndInvalidSegmentsInfo segments =
@@ -903,7 +901,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       // if block is invalid then dont add the count
       SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
 
-      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
         Long blockCount = blockRowCountMapping.get(key);
         if (blockCount == null) {
           blockCount = 0L;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
index b814468..89a4a9a 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/BlockLevelTraverser.java
@@ -74,7 +74,7 @@ public class BlockLevelTraverser {
       // if block is invalid then dont add the count
       SegmentUpdateDetails details = updateStatusManager.getDetailsForABlock(key);
 
-      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getStatus())) {
+      if (null == details || !CarbonUpdateUtil.isBlockInvalid(details.getSegmentStatus())) {
         blockRowMap.put(key, rowCount);
         count++;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index dd48cd9..3b5d736 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -61,6 +61,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
@@ -455,7 +456,7 @@ public class StoreCreator {
       String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
     LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
     loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
-    loadMetadataDetails.setLoadStatus("SUCCESS");
+    loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS);
     loadMetadataDetails.setLoadName(String.valueOf(0));
     loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e932213..e8f5d6d 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -25,7 +25,7 @@ import java.util.{ArrayList, Date, List, UUID}
 
 import scala.collection.JavaConversions._
 
-import com.google.gson.Gson
+import com.google.gson.{Gson, GsonBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
@@ -46,7 +46,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.{SchemaEvolution, SchemaEvolutionEntry}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier, ColumnIdentifier}
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.writer.sortindex.{CarbonDictionarySortIndexWriter, CarbonDictionarySortIndexWriterImpl, CarbonDictionarySortInfo, CarbonDictionarySortInfoPreparator}
@@ -523,7 +523,7 @@ object CarbonDataStoreCreator {
     try {
       val loadMetadataDetails: LoadMetadataDetails = new LoadMetadataDetails()
       loadMetadataDetails.setLoadEndTime(System.currentTimeMillis())
-      loadMetadataDetails.setLoadStatus("SUCCESS")
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
       loadMetadataDetails.setLoadName(String.valueOf(0))
       loadMetadataDetails.setLoadStartTime(
         loadMetadataDetails.getTimeStamp(readCurrentTime()))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index b07c82a..111ede7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -19,12 +19,13 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.datastore.TableSegmentUniqueIdentifier
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CacheClient
 import org.apache.spark.sql.test.util.QueryTest
@@ -118,8 +119,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted.
-    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
-
+    assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus)
   }
 
   /**
@@ -140,10 +140,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.
-    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
+    assertResult(SegmentStatus.COMPACTED)(segs(3).getSegmentStatus)
     // for segment 0.1 . should get deleted
-    assert(segs(2).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE))
-
+    assertResult(SegmentStatus.MARKED_FOR_DELETE)(segs(2).getSegmentStatus)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 054f778..cf6a032 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -18,7 +18,6 @@
 package org.apache.carbondata.api
 
 import java.lang.Long
-import java.text.SimpleDateFormat
 
 import scala.collection.JavaConverters._
 
@@ -29,7 +28,7 @@ import org.apache.spark.sql.types.TimestampType
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
@@ -47,7 +46,6 @@ object CarbonStore {
       tableFolderPath: String): Seq[Row] = {
     val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(tableFolderPath)
     if (loadMetadataDetailsArray.nonEmpty) {
-      val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
       var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
         java.lang.Double.parseDouble(l1.getLoadName) > java.lang.Double.parseDouble(l2.getLoadName)
       }
@@ -66,11 +64,12 @@ object CarbonStore {
       loadMetadataDetailsSortedArray
         .filter(_.getVisibility.equalsIgnoreCase("true"))
         .map { load =>
-          val mergedTo = if (load.getMergedLoadName != null) {
-            load.getMergedLoadName
-          } else {
-            ""
-          }
+          val mergedTo =
+            if (load.getMergedLoadName != null) {
+              load.getMergedLoadName
+            } else {
+              "NA"
+            }
 
           val startTime =
             if (load.getLoadStartTime == CarbonCommonConstants.SEGMENT_LOAD_TIME_DEFAULT) {
@@ -88,11 +87,10 @@ object CarbonStore {
 
           Row(
             load.getLoadName,
-            load.getLoadStatus,
+            load.getSegmentStatus.getMessage,
             startTime,
             endTime,
-            mergedTo
-          )
+            mergedTo)
         }.toSeq
     } else {
       Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 7cf8c88..bbf242e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -28,7 +28,7 @@ package org.apache.carbondata.spark
 import org.apache.spark.sql.execution.command.ExecutionErrors
 
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 
 
 trait Value[V] extends Serializable {
@@ -76,13 +76,13 @@ class updateResultImpl
 }
 
 trait DeleteDelataResult[K, V] extends Serializable {
-  def getKey(key: String, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V)
+  def getKey(key: SegmentStatus, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V)
 }
 
 class DeleteDelataResultImpl
-  extends DeleteDelataResult[String, (SegmentUpdateDetails, ExecutionErrors)] {
-  override def getKey(key: String,
-      value: (SegmentUpdateDetails, ExecutionErrors)): (String, (SegmentUpdateDetails,
+  extends DeleteDelataResult[SegmentStatus, (SegmentUpdateDetails, ExecutionErrors)] {
+  override def getKey(key: SegmentStatus,
+      value: (SegmentUpdateDetails, ExecutionErrors)): (SegmentStatus, (SegmentUpdateDetails,
     ExecutionErrors)) = {
     (key, value)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index bdf9a71..ca2ca2e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -31,7 +31,7 @@ import org.apache.spark.storage.StorageLevel
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.row.CarbonRow
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.{CSVInputFormat, StringArrayWritable}
@@ -144,14 +144,14 @@ object DataLoadProcessBuilderOnSpark {
       val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE +
         "Partial_Success"
       val loadMetadataDetails = new LoadMetadataDetails()
-      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       executionErrors.failureCauses = FailureCauses.BAD_RECORDS
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     } else {
       val uniqueLoadStatusId = model.getTableName + CarbonCommonConstants.UNDERSCORE + "Success"
       val loadMetadataDetails = new LoadMetadataDetails()
-      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       Array((uniqueLoadStatusId, (loadMetadataDetails, executionErrors)))
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
index 2fb72f7..2fbc7a3 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableAddColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.spark.util.GlobalDictionaryUtil
@@ -52,7 +53,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
     carbonStorePath: String)
-  extends CarbonRDD[(Int, String)](sc, Nil) {
+  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
 
   val lockType: String = CarbonProperties.getInstance.getProperty(CarbonCommonConstants.LOCK_TYPE,
     CarbonCommonConstants.CARBON_LOCK_TYPE_HDFS)
@@ -64,10 +65,10 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
   }
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[(Int, String)] = {
+      context: TaskContext): Iterator[(Int, SegmentStatus)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-    val iter = new Iterator[(Int, String)] {
+    val status = SegmentStatus.SUCCESS
+    val iter = new Iterator[(Int, SegmentStatus)] {
       try {
         val columnSchema = split.asInstanceOf[AddColumnPartition].columnSchema
         // create dictionary file if it is a dictionary column
@@ -111,7 +112,7 @@ class AlterTableAddColumnRDD[K, V](sc: SparkContext,
         }
       }
 
-      override def next(): (Int, String) = {
+      override def next(): (Int, SegmentStatus) = {
         (split.index, status)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
index fde5cd6..5b0889f 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableDropColumnRDD.scala
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
+import org.apache.carbondata.core.statusmanager.SegmentStatus
 
 /**
  * This is a partitioner class for dividing the newly added columns into partitions
@@ -49,7 +50,7 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
     @transient newColumns: Seq[ColumnSchema],
     carbonTableIdentifier: CarbonTableIdentifier,
     carbonStorePath: String)
-  extends CarbonRDD[(Int, String)](sc, Nil) {
+  extends CarbonRDD[(Int, SegmentStatus)](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
     newColumns.zipWithIndex.map { column =>
@@ -58,10 +59,10 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
   }
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[(Int, String)] = {
+      context: TaskContext): Iterator[(Int, SegmentStatus)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-    val iter = new Iterator[(Int, String)] {
+    val status = SegmentStatus.SUCCESS
+    val iter = new Iterator[(Int, SegmentStatus)] {
       try {
         val columnSchema = split.asInstanceOf[DropColumnPartition].columnSchema
         if (columnSchema.hasEncoding(Encoding.DICTIONARY) &&
@@ -87,7 +88,7 @@ class AlterTableDropColumnRDD[K, V](sc: SparkContext,
         }
       }
 
-      override def next(): (Int, String) = {
+      override def next(): (Int, SegmentStatus) = {
         (split.index, status)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index b2e0c47..c1c7bf1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
 import org.apache.carbondata.core.service.{CarbonCommonFactory, PathService}
+import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -333,17 +334,17 @@ class CarbonBlockDistinctValuesCombineRDD(
 class CarbonGlobalDictionaryGenerateRDD(
     prev: RDD[(Int, ColumnDistinctValues)],
     model: DictionaryLoadModel)
-  extends CarbonRDD[(Int, String)](prev) {
+  extends CarbonRDD[(Int, SegmentStatus)](prev) {
 
   override def getPartitions: Array[Partition] = firstParent[(Int, ColumnDistinctValues)].partitions
 
   override def internalCompute(split: Partition,
-      context: TaskContext): Iterator[(Int, String)] = {
+      context: TaskContext): Iterator[(Int, SegmentStatus)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.STORE_LOCATION,
       model.hdfsLocation)
-    var status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-    val iter = new Iterator[(Int, String)] {
+    var status = SegmentStatus.SUCCESS
+    val iter = new Iterator[(Int, SegmentStatus)] {
       var dictionaryForDistinctValueLookUp: Dictionary = _
       var dictionaryForSortIndexWriting: Dictionary = _
       var dictionaryForDistinctValueLookUpCleared: Boolean = false
@@ -452,7 +453,7 @@ class CarbonGlobalDictionaryGenerateRDD(
       } catch {
         case dictionaryException: NoRetryException =>
           LOGGER.error(dictionaryException)
-          status = CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          status = SegmentStatus.LOAD_FAILURE
         case ex: Exception =>
           LOGGER.error(ex)
           throw ex
@@ -485,7 +486,7 @@ class CarbonGlobalDictionaryGenerateRDD(
         }
       }
 
-      override def next(): (Int, String) = {
+      override def next(): (Int, SegmentStatus) = {
         (split.index, status)
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index cbdb336..3a74dfa 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -30,10 +30,10 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders, LoadMetadataUtil}
+import org.apache.carbondata.processing.util.{CarbonLoaderUtil, DeleteLoadFolders}
 import org.apache.carbondata.spark.compaction.CompactionCallable
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -162,13 +162,25 @@ object DataManagementFunc {
     }
   }
 
+  private def isLoadDeletionRequired(metaDataLocation: String): Boolean = {
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    if (details != null && details.nonEmpty) for (oneRow <- details) {
+      if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus ||
+           SegmentStatus.COMPACTED == oneRow.getSegmentStatus) &&
+          oneRow.getVisibility.equalsIgnoreCase("true")) {
+        return true
+      }
+    }
+    false
+  }
+
   def deleteLoadsAndUpdateMetadata(
       dbName: String,
       tableName: String,
       storePath: String,
       isForceDeletion: Boolean,
       carbonTable: CarbonTable): Unit = {
-    if (LoadMetadataUtil.isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+    if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
       val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
       val carbonTableStatusLock =
         CarbonLockFactory.getCarbonLockObj(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 74f7528..6f44a0d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
@@ -212,7 +212,7 @@ class NewCarbonDataLoadRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
 
         val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
           .USE_PREFETCH_WHILE_LOADING, CarbonCommonConstants.USE_PREFETCH_WHILE_LOADING_DEFAULT)
@@ -233,12 +233,12 @@ class NewCarbonDataLoadRDD[K, V](
           recordReaders)
       } catch {
         case e: NoRetryException =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage
           logInfo("Bad Record Found")
         case e: Exception =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
@@ -247,8 +247,7 @@ class NewCarbonDataLoadRDD[K, V](
         CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          .equals(loadMetadataDetails.getLoadStatus)) {
+        if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
             .printStatisticsInfo(model.getPartitionId)
         }
@@ -346,7 +345,7 @@ class NewDataFrameLoaderRDD[K, V](
       try {
 
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         carbonLoadModel.setPreFetch(false)
@@ -376,12 +375,12 @@ class NewDataFrameLoaderRDD[K, V](
         executor.execute(model, loader.storeLocation, recordReaders.toArray)
       } catch {
         case e: NoRetryException =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage
           logInfo("Bad Record Found")
         case e: Exception =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
           logInfo("DataLoad failure", e)
           LOGGER.error(e)
           throw e
@@ -390,8 +389,7 @@ class NewDataFrameLoaderRDD[K, V](
         CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          .equals(loadMetadataDetails.getLoadStatus)) {
+        if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
             .printStatisticsInfo(model.getPartitionId)
         }
@@ -540,7 +538,7 @@ class PartitionTableDataLoaderRDD[K, V](
       try {
 
         loadMetadataDetails.setPartitionCount(partitionID)
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
@@ -561,12 +559,12 @@ class PartitionTableDataLoaderRDD[K, V](
         executor.execute(model, loader.storeLocation, recordReaders)
       } catch {
         case e: NoRetryException =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
           executionErrors.failureCauses = FailureCauses.BAD_RECORDS
           executionErrors.errorMsg = e.getMessage
           logInfo("Bad Record Found")
         case e: Exception =>
-          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+          loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
           logInfo("DataLoad For Partition Table failure", e)
           LOGGER.error(e)
           throw e
@@ -575,8 +573,7 @@ class PartitionTableDataLoaderRDD[K, V](
         CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false, false)
         // in case of failure the same operation will be re-tried several times.
         // So print the data load statistics only in case of non failure case
-        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
-          .equals(loadMetadataDetails.getLoadStatus)) {
+        if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
             .printStatisticsInfo(model.getPartitionId)
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
index fbe9377..600cd80 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/SparkDataMapJob.scala
@@ -27,8 +27,6 @@ import org.apache.hadoop.mapreduce.{InputSplit, Job, TaskAttemptID, TaskType}
 import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl
 import org.apache.spark.{Partition, SparkContext, TaskContext, TaskKilledException}
 
-import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf
 import org.apache.carbondata.hadoop.api.{DataMapJob, DistributableDataMapFormat}
@@ -69,8 +67,6 @@ class DataMapPruneRDD(sc: SparkContext,
 
   override def internalCompute(split: Partition,
       context: TaskContext): Iterator[ExtendedBlocklet] = {
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val status = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
     val attemptId = new TaskAttemptID(jobTrackerId, id, TaskType.MAP, split.index, 0)
     val attemptContext = new TaskAttemptContextImpl(new Configuration(), attemptId)
     val inputSplit = split.asInstanceOf[DataMapRDDPartition].inputSplit

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index eb07240..b1dfc01 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -25,7 +25,7 @@ import org.apache.spark.sql.Row
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.DataLoadExecutor
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -55,7 +55,7 @@ object UpdateDataLoad {
       // Intialize to set carbon properties
       loader.initialize()
 
-      loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+      loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
       new DataLoadExecutor().execute(carbonLoadModel,
         loader.storeLocation,
         recordReaders.toArray)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 84294ff..b51d0f0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -39,7 +39,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.datastore.row.LoadStatusType
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
@@ -48,7 +47,7 @@ import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -518,11 +517,12 @@ object CommonUtil {
       storePath: String,
       insertOverwrite: Boolean): Unit = {
     val newLoadMetaEntry = new LoadMetadataDetails
-    val status: String = if (insertOverwrite) {
-      LoadStatusType.INSERT_OVERWRITE.getMessage
+    val status = if (insertOverwrite) {
+      SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
     } else {
-      LoadStatusType.IN_PROGRESS.getMessage
+      SegmentStatus.INSERT_IN_PROGRESS
     }
+
     // reading the start time of data load.
     val loadStartTime = CarbonUpdateUtil.readCurrentTime
     model.setFactTimeStamp(loadStartTime)
@@ -548,7 +548,7 @@ object CommonUtil {
       model: CarbonLoadModel): Unit = {
     // in case if failure the load status should be "Marked for delete" so that it will be taken
     // care during clean up
-    val loadStatus = CarbonCommonConstants.MARKED_FOR_DELETE
+    val loadStatus = SegmentStatus.MARKED_FOR_DELETE
     // always the last entry in the load metadata details will be the current load entry
     val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
     CarbonLoaderUtil
@@ -799,9 +799,9 @@ object CommonUtil {
                       var loadInprogressExist = false
                       val staleFolders: Seq[CarbonFile] = Seq()
                       listOfLoadFolderDetailsArray.foreach { load =>
-                        if (load.getLoadStatus.equals(LoadStatusType.IN_PROGRESS.getMessage) ||
-                            load.getLoadStatus.equals(LoadStatusType.INSERT_OVERWRITE.getMessage)) {
-                          load.setLoadStatus(CarbonCommonConstants.MARKED_FOR_DELETE)
+                        if (load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
+                            load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+                          load.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE)
                           staleFolders :+ FileFactory.getCarbonFile(
                             carbonTablePath.getCarbonDataDirectoryPath("0", load.getLoadName))
                           loadInprogressExist = true

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2f0959a8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index 840e8ae..3bb8d94 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonDimension, ColumnSchema}
 import org.apache.carbondata.core.reader.CarbonDictionaryReader
 import org.apache.carbondata.core.service.CarbonCommonFactory
+import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
@@ -388,12 +389,12 @@ object GlobalDictionaryUtil {
   private def checkStatus(carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,
       model: DictionaryLoadModel,
-      status: Array[(Int, String)]) = {
+      status: Array[(Int, SegmentStatus)]) = {
     var result = false
     val tableName = model.table.getTableName
     status.foreach { x =>
       val columnName = model.primDimensions(x._1).getColName
-      if (CarbonCommonConstants.STORE_LOADSTATUS_FAILURE.equals(x._2)) {
+      if (SegmentStatus.LOAD_FAILURE == x._2) {
         result = true
         LOGGER.error(s"table:$tableName column:$columnName generate global dictionary file failed")
       }